pathway.io.kafka package
pathway.io.kafka.read(rdkafka_settings, topic_names, format='raw', value_columns=None, primary_key=None, debug_data=None)
Reads table from a set of topics in Kafka. There are three formats currently supported: “raw”, “csv”, and “json”.
- Parameters
- rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_names (
List
str
) – Names of topics in Kafka from which the data should be read. - format – format of the input data, “raw”, “csv”, or “json”
- value_columns (
Optional
List
str
) – Columns to extract for a table, required for format other than “raw”. - primary_key (
Optional
List
str
) – In case the table should have primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, primary key will be generated as uuid4. - debug_data – Static data replacing original one when debug mode is active.
- rdkafka_settings (
- Returns
The table read. - Return type
Table
When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data. For other formats, the argument value_column is required and defines the columns.
Example:
Consider there is a queue in Kafka, running locally on a port 9092. Our queue can use SASL-SSL authentication over SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:
>>> import os>>> rdkafka_settings = { "bootstrap.servers": "localhost:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "60000", "sasl.username": os.environ["KAFKA_USERNAME"], "sasl.password": os.environ["KAFKA_PASSWORD"]}
To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:
Raw version:
>>> import pathway as pw>>> t = pw.kafka.read( rdkafka_settings, topic_names=["animals"], format="raw",)
All the data will be accessible in the column data.
csv version:
>>> import pathway as pw>>> t = pw.kafka.read( rdkafka_settings, topic_names=["animals"], format="csv", value_columns=["owner", "pet"],)
In case of CSV format, the first message must be the header:
owner,pet
Then, simple data rows are expected. For example:
Alice,cat
Bob,dog
This way, we acquire a table which looks as follows:
>>> pw.debug.compute_and_print(t, include_id=False)owner petAlice cat Bob dog
JSON version:
>>> import pathway as pw>>> t = pw.kafka.read( rdkafka_settings, topic_names=["animals"], format="json", value_columns=["owner", "pet"],)
For the JSON connector, we can send these two messages:
{"owner": "Alice", "pet": "cat"}{"owner": "Bob", "pet": "dog"}
This way, we acquire a table which looks as follows:
>>> pw.debug.compute_and_print(t, include_id=False)owner petAlice cat Bob dog
pathway.io.kafka.write(table, rdkafka_settings, topic_name, format='json', commit_frequency_in_messages=None, commit_frequency_ms=None)
Write a table to a given topic on a Kafka instance.
- Parameters
- table (
Table
) – the table to output. - rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_name (
str
) – name of topic in Kafka to which the data should be sent. - format (
str
) – format of the input data, only “json” is currently supported. - commit_frequency_in_messages (
Optional
int
) – maximum number of commits contained in a single message sent to the Kafka instance. Note that too large messages are automatically sent, so some messages may contain less than commit_frequency_in_messages commits. - commit_frequency_ms (
Optional
int
) – maximum number of milliseconds (ms) before a message is sent to the Kafka instance. Note that too large messages are automatically sent, so some messages may be sent before commit_frequency_ms is reached.
- table (
- Return type
Table
- Returns
None
Limitations:
For future proofing, the format is configurable, but (for now) only JSON is available.
Example:
Consider there is a queue in Kafka, running locally on a port 9092. Our queue can use SASL-SSL authentication over SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:
>>> import os>>> rdkafka_settings = { "bootstrap.servers": "localhost:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": os.environ["KAFKA_USERNAME"], "sasl.password": os.environ["KAFKA_PASSWORD"]}
We want to send a Pathway table t to our Kafka instance. To connect to the topic “animals” and sent messages, the connector must be used as follows, depending on the format:
JSON version:
>>> import pathway as pw>>> t = pw.kafka.read( rdkafka_settings, "animals", format="json",)
All the updates of table t will be send to the Kafka instance.