pw.io.debezium
read(rdkafka_settings, topic_name, *, db_type=<pathway.engine.DebeziumDBType object>, schema, debug_data=None, autocommit_duration_ms=1500, name=None, **kwargs)
sourceConnector, which takes a topic in the format of Debezium and maintains a corresponding table in Pathway, on which you can do all the table operations provided. In order to do that, you will need a Debezium connector.
- Parameters
- rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_name (
str
) – Name of topic in Kafka to which the updates are streamed. - db_type (
DebeziumDBType
) – Type of the database from which events are streamed; - schema (
type
[Schema
]) – Schema of the resulting table. - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
int
|None
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
- rdkafka_settings (
- Returns
Table – The table read.
Example:
Consider there is a need to stream a database table along with its changes directly into the Pathway engine. One of the standard well-known solutions for table streaming is Debezium: it supports streaming data from MySQL, Postgres, MongoDB and a few more databases directly to a topic in Kafka. The streaming first sends a snapshot of the data and then streams changes for the specific change (namely: inserted, updated or removed) rows.
Consider there is a table in Postgres, which is created according to the following schema:
CREATE TABLE pets (
id SERIAL PRIMARY KEY,
age INTEGER,
owner TEXT,
pet TEXT
);
This table, by default, will be streamed to the topic with the same name. In order to
read it,you need to set the settings for rdkafka
. For the sake of demonstration,
let’s take those from the example of the Kafka connector:
import os
rdkafka_settings = {
"bootstrap.servers": "localhost:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "60000",
"sasl.username": os.environ["KAFKA_USERNAME"],
"sasl.password": os.environ["KAFKA_PASSWORD"]
}
Now, using the settings you can set up a connector. It is as simple as:
import pathway as pw
class InputSchema(pw.Schema):
id: str = pw.column_definition(primary_key=True)
age: int
owner: str
pet: str
t = pw.io.debezium.read(
rdkafka_settings,
topic_name="pets",
schema=InputSchema
)
As a result, upon its start, the connector would provide the full snapshot of the
table pets
into the table t
in Pathway. The table t
can then be operated as
usual. Throughout the run time, the rows in the table pets
can change. In this
case, the changes in the result will be provided in the output connectors by the
Stream of Updates mechanism.