pathway.io.debezium package
pathway.io.debezium.read(rdkafka_settings, topic_name, value_columns, primary_key=None, debug_data=None, autocommit_duration_ms=None, types=None, persistent_id=None)
Connector, which takes a topic in the format of Debezium and maintains a corresponding table in Pathway, on which you can do all the table operations provided. In order to do that, you will need a Debezium connector.
- Parameters
- rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_name (
str
) – Name of topic in Kafka to which the updates are streamed. - value_columns (
List
str
) – Columns to extract for a table. - primary_key (
Optional
List
str
) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
Optional
int
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - types (
Optional
Dict
str
,PathwayType
) – Dictionary containing the mapping between the columns and the data types (pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. - persistent_id (
Optional
int
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time.
- rdkafka_settings (
- Returns
The table read. - Return type
Table
Example:
Consider there is a need to stream a database table along with its changes directly into the Pathway engine. One of the standard well-known solutions for table streaming is Debezium: it supports streaming data from MySQL, Postgres, MongoDB and a few more databases directly to a topic in Kafka. The streaming first sends a snapshot of the data and then streams changes for the specific change (namely: inserted, updated or removed) rows.
Consider there is a table in Postgres, which is created according to the following schema:
CREATE TABLE pets ( id SERIAL PRIMARY KEY, age INTEGER, owner TEXT, pet TEXT);
This table, by default, will be streamed to the topic with the same name. In order to
read it,you need to set the settings for rdkafka
. For the sake of demonstration,
let’s take those from the example of the Kafka connector:
import os rdkafka_settings = {
"bootstrap.servers": "localhost:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "60000", "sasl.username": os.environ["KAFKA_USERNAME"], "sasl.password": os.environ["KAFKA_PASSWORD"]}
Now, using the settings you can set up a connector. It is as simple as:
t = pw.io.debezium.read(
rdkafka_settings, topic_name="pets", value_columns=["age", "owner", "pet"], primary_key="id",)
As a result, upon its start, the connector would provide the full snapshot of the
table pets
into the table t
in Pathway. The table t
can then be operated as
usual. Throughout the run time, the rows in the table pets
can change. In this
case, the changes in the result will be provided in the output connectors by the
Stream of Updates mechanism.