pathway.io.debezium package


pathway.io.debezium.read(rdkafka_settings, topic_name, value_columns, primary_key=None, debug_data=None)

Connector, which takes a topic in the format of Debezium and maintains a corresponding table in Pathway, on which you can do all the table operations provided. In order to do that, we will need a Debezium connector.

  • Parameters
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_name (str) – Name of topic in Kafka to which the updates are streamed.
    • value_columns (Liststr) – Columns to extract for a table.
    • primary_key (OptionalListstr) – In case the table should have primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, primary key will be generated as uuid4.
    • debug_data – Static data replacing original one when debug mode is active.
  • Returns
    The table read.
  • Return type
    Table

Example:

Consider there is a need to stream a database table along with its changes directly into the Pathway engine. One of the standard well-known solutions for table streaming is Debezium: it supports streaming data from MySQL, Postgres, MongoDB and a few more databases directly to a topic in Kafka. The streaming first sends a snapshot of the data and then streams changes for the specific change (namely: inserted, updated or removed) rows.

Consider there is a table in Postgres, which is created according to the following schema:

CREATE TABLE pets (    id SERIAL PRIMARY KEY,    age INTEGER,    owner TEXT,    pet TEXT);

This table, by default, will be streamed to the topic with the same name. In order to read it, we need to set the settings for rdkafka. For the sake of demonstration, let’s take those from the example of the Kafka connector:

>>> import os>>> rdkafka_settings = {    "bootstrap.servers": "localhost:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "group.id": "$GROUP_NAME",    "session.timeout.ms": "60000",    "sasl.username": os.environ["KAFKA_USERNAME"],    "sasl.password": os.environ["KAFKA_PASSWORD"]}

Now we would like to set up a connector. It is as simple as:

>>> t = pw.debezium.read(    rdkafka_settings,    topic_name="pets",    value_columns=["age", "owner", "pet"],    primary_key="id",)

As a result, upon its start, the connector would provide the full snapshot of the table pets into the table t in Pathway. The table t then can be operated as usual. Throughout the run time, the rows in the table pets can change. In this case, the changes in the result will be provided in the output connectors by the Stream of Updates mechanism.