pathway.io.kafka package


pathway.io.kafka.read(rdkafka_settings, topic_names, format='raw', value_columns=None, primary_key=None, debug_data=None)

Reads table from a set of topics in Kafka. There are three formats currently supported: “raw”, “csv”, and “json”.

  • Parameters
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_names (Liststr) – Names of topics in Kafka from which the data should be read.
    • format – format of the input data, “raw”, “csv”, or “json”
    • value_columns (OptionalListstr) – Columns to extract for a table, required for format other than “raw”.
    • primary_key (OptionalListstr) – In case the table should have primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, primary key will be generated as uuid4.
    • debug_data – Static data replacing original one when debug mode is active.
  • Returns
    The table read.
  • Return type
    Table

When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data. For other formats, the argument value_column is required and defines the columns.

Example:

Consider there is a queue in Kafka, running locally on a port 9092. Our queue can use SASL-SSL authentication over SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

>>> import os>>> rdkafka_settings = {    "bootstrap.servers": "localhost:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "group.id": "$GROUP_NAME",    "session.timeout.ms": "60000",    "sasl.username": os.environ["KAFKA_USERNAME"],    "sasl.password": os.environ["KAFKA_PASSWORD"]}

To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:

Raw version:

>>> import pathway as pw>>> t = pw.kafka.read(    rdkafka_settings,    topic_names=["animals"],    format="raw",)

All the data will be accessible in the column data.

csv version:

>>> import pathway as pw>>> t = pw.kafka.read(    rdkafka_settings,    topic_names=["animals"],    format="csv",    value_columns=["owner", "pet"],)

In case of CSV format, the first message must be the header:

owner,pet

Then, simple data rows are expected. For example:

Alice,cat
Bob,dog

This way, we acquire a table which looks as follows:

>>> pw.debug.compute_and_print(t, include_id=False)owner petAlice cat  Bob dog

JSON version:

>>> import pathway as pw>>> t = pw.kafka.read(    rdkafka_settings,    topic_names=["animals"],    format="json",    value_columns=["owner", "pet"],)

For the JSON connector, we can send these two messages:

{"owner": "Alice", "pet": "cat"}{"owner": "Bob", "pet": "dog"}

This way, we acquire a table which looks as follows:

>>> pw.debug.compute_and_print(t, include_id=False)owner petAlice cat  Bob dog

pathway.io.kafka.write(table, rdkafka_settings, topic_name, format='json', commit_frequency_in_messages=None, commit_frequency_ms=None)

Write a table to a given topic on a Kafka instance.

  • Parameters
    • table (Table) – the table to output.
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_name (str) – name of topic in Kafka to which the data should be sent.
    • format (str) – format of the input data, only “json” is currently supported.
    • commit_frequency_in_messages (Optionalint) – maximum number of commits contained in a single message sent to the Kafka instance. Note that too large messages are automatically sent, so some messages may contain less than commit_frequency_in_messages commits.
    • commit_frequency_ms (Optionalint) – maximum number of milliseconds (ms) before a message is sent to the Kafka instance. Note that too large messages are automatically sent, so some messages may be sent before commit_frequency_ms is reached.
  • Return type
    Table
  • Returns
    None

Limitations:

For future proofing, the format is configurable, but (for now) only JSON is available.

Example:

Consider there is a queue in Kafka, running locally on a port 9092. Our queue can use SASL-SSL authentication over SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

>>> import os>>> rdkafka_settings = {    "bootstrap.servers": "localhost:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "sasl.username": os.environ["KAFKA_USERNAME"],    "sasl.password": os.environ["KAFKA_PASSWORD"]}

We want to send a Pathway table t to our Kafka instance. To connect to the topic “animals” and sent messages, the connector must be used as follows, depending on the format:

JSON version:

>>> import pathway as pw>>> t = pw.kafka.read(    rdkafka_settings,    "animals",    format="json",)

All the updates of table t will be send to the Kafka instance.