pathway.io.kafka package
pathway.io.kafka.read(rdkafka_settings, topic_names, format='raw', value_columns=None, primary_key=None, types=None, debug_data=None, autocommit_duration_ms=None, json_field_paths=None, parallel_readers=None, persistent_id=None)
Reads table from a set of topics in Kafka. There are three formats currently supported: “raw”, “csv”, and “json”.
- Parameters
- rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_names (
List
str
) – Names of topics in Kafka from which the data should be read. - format – format of the input data, “raw”, “csv”, or “json”
- value_columns (
Optional
List
str
) – Columns to extract for a table, required for format other than “raw”. - primary_key (
Optional
List
str
) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
Optional
int
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - types (
Optional
Dict
str
,PathwayType
) – Dictionary containing the mapping between the columns and the data types (pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. - json_field_paths (
Optional
Dict
str
,str
) – If the format is JSON, this field allows to map field names into path in the field. For the field which require such mapping, it should be given in the format<field_name>: <path to be mapped>
, where the path to be mapped needs to be a JSON Pointer (RFC 6901). - parallel_readers (
Optional
int
) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds. - persistent_id (
Optional
int
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time.
- rdkafka_settings (
- Returns
The table read. - Return type
Table
When using the format “raw”, the connector will produce a single-column table:
all the data is saved into a column named data
.
For other formats, the argument value_column is required and defines the columns.
Example:
Consider there is a queue in Kafka, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:
import os rdkafka_settings = {
"bootstrap.servers": "localhost:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "60000", "sasl.username": os.environ["KAFKA_USERNAME"], "sasl.password": os.environ["KAFKA_PASSWORD"]}
To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:
Raw version:
import pathway as pw t = pw.io.kafka.read(
rdkafka_settings, topic_names=["animals"], format="raw",)
All the data will be accessible in the column data.
CSV version:
import pathway as pw t = pw.io.kafka.read(
rdkafka_settings, topic_names=["animals"], format="csv", value_columns=["owner", "pet"],)
In case of CSV format, the first message must be the header:
owner,pet
Then, simple data rows are expected. For example:
Alice,cat
Bob,dog
This way, you get a table which looks as follows:
pw.debug.compute_and_print(t, include_id=False)
owner petAlice cat Bob dog
JSON version:
import pathway as pw t = pw.io.kafka.read(
rdkafka_settings, topic_names=["animals"], format="json", value_columns=["owner", "pet"],)
For the JSON connector, you can send these two messages:
{"owner": "Alice", "pet": "cat"}{"owner": "Bob", "pet": "dog"}
This way, you get a table which looks as follows:
pw.debug.compute_and_print(t, include_id=False)
owner petAlice cat Bob dog
Now consider that the data about pets come in a more sophisticated way. For instance you have an owner, kind and name of an animal, along with some physical measurements.
The JSON payload in this case may look as follows:
{ "name": "Jack", "pet": { "animal": "cat", "name": "Bob", "measurements": [100, 200, 300] }}
Suppose you need to extract a name of the pet and the height, which is the 2nd (1-based) or the 1st (0-based) element in the array of measurements. Then, you use JSON Pointer and do a connector, which gets the data as follows:
import pathway as pw t = pw.io.kafka.read(
rdkafka_settings, topic_names=["animals"], format="json", value_columns=["pet_name", "pet_height"], column_paths={ "pet_name": "/pet/name", "pet_height": "/pet/measurements/1" },)
pathway.io.kafka.write(table, rdkafka_settings, topic_name, *, format='json', delimiter=',', **kwargs)
Write a table to a given topic on a Kafka instance.
- Parameters
- table (
Table
) – the table to output. - rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_name (
str
) – name of topic in Kafka to which the data should be sent. - format (
str
) – format of the input data, currently “json” and “dsv” are supported. - delimiter (
str
) – field delimiter to be used in case of delimiter-separated values format.
- table (
- Return type
Table
- Returns
None
Limitations:
For future proofing, the format is configurable, but (for now) only JSON is available.
Example:
Consider there is a queue in Kafka, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:
import os rdkafka_settings = {
"bootstrap.servers": "localhost:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": os.environ["KAFKA_USERNAME"], "sasl.password": os.environ["KAFKA_PASSWORD"]}
You want to send a Pathway table t to the Kafka instance. To connect to the topic “animals” and send messages, the connector must be used as follows, depending on the format:
JSON version:
import pathway as pw t = pw.io.kafka.read(
rdkafka_settings, "animals", format="json",)
All the updates of table t will be sent to the Kafka instance.