pathway.io.kafka package


pathway.io.kafka.read(rdkafka_settings, topic_names, format='raw', value_columns=None, primary_key=None, types=None, debug_data=None, autocommit_duration_ms=None, json_field_paths=None, parallel_readers=None, persistent_id=None)

Reads table from a set of topics in Kafka. There are three formats currently supported: “raw”, “csv”, and “json”.

  • Parameters
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_names (Liststr) – Names of topics in Kafka from which the data should be read.
    • format – format of the input data, “raw”, “csv”, or “json”
    • value_columns (OptionalListstr) – Columns to extract for a table, required for format other than “raw”.
    • primary_key (OptionalListstr) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly.
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (Optionalint) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • types (OptionalDictstr, PathwayType) – Dictionary containing the mapping between the columns and the data types (pw.Type) of the values of those columns. This parameter is optional, and if not provided the default type is pw.Type.ANY.
    • json_field_paths (OptionalDictstr, str) – If the format is JSON, this field allows to map field names into path in the field. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • parallel_readers (Optionalint) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds.
    • persistent_id (Optionalint) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    The table read.
  • Return type
    Table

When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data. For other formats, the argument value_column is required and defines the columns.

Example:

Consider there is a queue in Kafka, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

import os
rdkafka_settings = {
"bootstrap.servers": "localhost:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "60000",
"sasl.username": os.environ["KAFKA_USERNAME"],
"sasl.password": os.environ["KAFKA_PASSWORD"]
}

To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:

Raw version:

import pathway as pw
t = pw.io.kafka.read(
rdkafka_settings,
topic_names=["animals"],
format="raw",
)

All the data will be accessible in the column data.

CSV version:

import pathway as pw
t = pw.io.kafka.read(
rdkafka_settings,
topic_names=["animals"],
format="csv",
value_columns=["owner", "pet"],
)

In case of CSV format, the first message must be the header:

owner,pet

Then, simple data rows are expected. For example:

Alice,cat Bob,dog

This way, you get a table which looks as follows:

pw.debug.compute_and_print(t, include_id=False)
owner pet
Alice cat
Bob dog

JSON version:

import pathway as pw
t = pw.io.kafka.read(
rdkafka_settings,
topic_names=["animals"],
format="json",
value_columns=["owner", "pet"],
)

For the JSON connector, you can send these two messages:

{"owner": "Alice", "pet": "cat"}
{"owner": "Bob", "pet": "dog"}

This way, you get a table which looks as follows:

pw.debug.compute_and_print(t, include_id=False)
owner pet
Alice cat
Bob dog

Now consider that the data about pets come in a more sophisticated way. For instance you have an owner, kind and name of an animal, along with some physical measurements.

The JSON payload in this case may look as follows:

{
"name": "Jack",
"pet": {
"animal": "cat",
"name": "Bob",
"measurements": [100, 200, 300]
}
}

Suppose you need to extract a name of the pet and the height, which is the 2nd (1-based) or the 1st (0-based) element in the array of measurements. Then, you use JSON Pointer and do a connector, which gets the data as follows:

import pathway as pw
t = pw.io.kafka.read(
rdkafka_settings,
topic_names=["animals"],
format="json",
value_columns=["pet_name", "pet_height"],
column_paths={
"pet_name": "/pet/name",
"pet_height": "/pet/measurements/1"
},
)

pathway.io.kafka.write(table, rdkafka_settings, topic_name, *, format='json', delimiter=',', **kwargs)

Write a table to a given topic on a Kafka instance.

  • Parameters
    • table (Table) – the table to output.
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_name (str) – name of topic in Kafka to which the data should be sent.
    • format (str) – format of the input data, currently “json” and “dsv” are supported.
    • delimiter (str) – field delimiter to be used in case of delimiter-separated values format.
  • Return type
    Table
  • Returns
    None

Limitations:

For future proofing, the format is configurable, but (for now) only JSON is available.

Example:

Consider there is a queue in Kafka, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

import os
rdkafka_settings = {
"bootstrap.servers": "localhost:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": os.environ["KAFKA_USERNAME"],
"sasl.password": os.environ["KAFKA_PASSWORD"]
}

You want to send a Pathway table t to the Kafka instance. To connect to the topic “animals” and send messages, the connector must be used as follows, depending on the format:

JSON version:

import pathway as pw
t = pw.io.kafka.read(
rdkafka_settings,
"animals",
format="json",
)

All the updates of table t will be sent to the Kafka instance.