pw.io.kafka

pw.io.kafka.read(rdkafka_settings, topic=None, *, schema=None, format='raw', debug_data=None, autocommit_duration_ms=1500, json_field_paths=None, parallel_readers=None, persistent_id=None, value_columns=None, primary_key=None, types=None, default_values=None, **kwargs)

sourceGeneralized method to read the data from the given topic in Kafka.

There are three formats currently supported: “raw”, “csv”, and “json”.

  • Parameters
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic (str | list[str] | None) – Name of topic in Kafka from which the data should be read.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • format – format of the input data, “raw”, “csv”, or “json”.
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (int | None) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • json_field_paths (dict[str, str] | None) – If the format is JSON, this field allows to map field names into path in the field. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • parallel_readers (int | None) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • value_columns (list[str] | None) – Columns to extract for a table, required for format other than “raw”. [will be deprecated soon]
    • primary_key (list[str] | None) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. [will be deprecated soon]
    • types (dict[str, PathwayType] | None) – Dictionary containing the mapping between the columns and the data types (pw.Type) of the values of those columns. This parameter is optional, and if not Otherwise, the primary key will be generated randomly. provided the default type is pw.Type.ANY. [will be deprecated soon]
    • default_values (dict[str, Any] | None) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, Otherwise, the primary key will be generated randomly. otherwise there will be no default value. [will be deprecated soon]
  • Returns
    Table – The table read.

When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data. For other formats, the argument value_column is required and defines the columns.

Example:

Consider there is a queue in Kafka, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

import os
rdkafka_settings = {
   "bootstrap.servers": "localhost:9092",
   "security.protocol": "sasl_ssl",
   "sasl.mechanism": "SCRAM-SHA-256",
   "group.id": "$GROUP_NAME",
   "session.timeout.ms": "60000",
   "sasl.username": os.environ["KAFKA_USERNAME"],
   "sasl.password": os.environ["KAFKA_PASSWORD"]
}

To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:

Raw version:

import pathway as pw
t = pw.io.kafka.read(
  rdkafka_settings,
   topic="animals",
   format="raw",
)

All the data will be accessible in the column data.

CSV version:

import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.kafka.read(
   rdkafka_settings,
   topic="animals",
   format="csv",
   schema=InputSchema
)

In case of CSV format, the first message must be the header:

owner,pet

Then, simple data rows are expected. For example:

Alice,cat
Bob,dog

This way, you get a table which looks as follows:

pw.debug.compute_and_print(t, include_id=False)
owner pet
Alice cat
  Bob dog

JSON version:

import pathway as pw
t = pw.io.kafka.read(
    rdkafka_settings,
    topic="animals",
    format="json",
    schema=InputSchema,
)

For the JSON connector, you can send these two messages:

{"owner": "Alice", "pet": "cat"}
{"owner": "Bob", "pet": "dog"}

This way, you get a table which looks as follows:

pw.debug.compute_and_print(t, include_id=False)
owner pet
Alice cat
  Bob dog

Now consider that the data about pets come in a more sophisticated way. For instance you have an owner, kind and name of an animal, along with some physical measurements.

The JSON payload in this case may look as follows:

{
    "name": "Jack",
    "pet": {
        "animal": "cat",
        "name": "Bob",
        "measurements": [100, 200, 300]
    }
}

Suppose you need to extract a name of the pet and the height, which is the 2nd (1-based) or the 1st (0-based) element in the array of measurements. Then, you use JSON Pointer and do a connector, which gets the data as follows:

import pathway as pw
class InputSchema(pw.Schema):
  pet_name: str
  pet_height: int
t = pw.io.kafka.read(
   rdkafka_settings,
   topic="animals",
   format="json",
   schema=InputSchema,
   json_field_paths={
       "pet_name": "/pet/name",
       "pet_height": "/pet/measurements/1"
   },
)

pw.io.kafka.read_from_upstash(endpoint, username, password, topic, *, read_only_new=False, schema=None, format='raw', debug_data=None, autocommit_duration_ms=1500, json_field_paths=None, parallel_readers=None, persistent_id=None)

sourceSimplified method to read data from Kafka instance hosted in Upstash. It requires endpoint address and topic along with credentials.

Read starts from the beginning of the topic, unless the read_only_new parameter is set to True.

There are three formats currently supported: “raw”, “csv”, and “json”.

  • Parameters
    • endpoint (str) – Upstash endpoint for the sought queue, which can be found on “Details” page.
    • username (str) – Username generated for this queue.
    • password (str) – Password generated for this queue. These credentials are also available on “Details” page.
    • topic (str) – Name of topic in Kafka from which the data should be read.
    • read_only_new (bool) – If set to True only the entries which appear after the start of the program will be read. Otherwise, the read will be done from the beginning of thetopic.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • format – format of the input data, “raw”, “csv”, or “json”.
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • json_field_paths (dict[str, str] | None) – If the format is JSON, this field allows to map field names into path in the field. For the fields which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • parallel_readers (int | None) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – The table read.

When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data.

Example:

Consider that there is a queue running in Upstash. Let’s say the endpoint name is “https://example-endpoint.com:19092”, topic is “test-topic” and the credentials are stored in environment variables.

Suppose that we need just to read the raw messages for the further processing. Then it can be done in the following way:

import os
import pathway as pw
t = pw.io.kafka.read_from_upstash(
    endpoint="https://example-endpoint.com:19092",
    topic="test-topic",
    username=os.environ["KAFKA_USERNAME"],
    password=os.environ["KAFKA_PASSWORD"],
)

pw.io.kafka.simple_read(server, topic, *, read_only_new=False, schema=None, format='raw', debug_data=None, autocommit_duration_ms=1500, json_field_paths=None, parallel_readers=None, persistent_id=None)

sourceSimplified method to read data from Kafka. Only requires the server address and the topic name. If you have any kind of authentication or require fine-tuning of the parameters, please use read method.

Read starts from the beginning of the topic, unless the read_only_new parameter is set to True.

There are three formats currently supported: “raw”, “csv”, and “json”.

  • Parameters
    • server (str) – Address of the server.
    • topic (str) – Name of topic in Kafka from which the data should be read.
    • read_only_new (bool) – If set to True only the entries which appear after the start of the program will be read. Otherwise, the read will be done from the beginning of thetopic.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • format – format of the input data, “raw”, “csv”, or “json”.
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • json_field_paths (dict[str, str] | None) – If the format is JSON, this field allows to map field names into path in the field. For the fields which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • parallel_readers (int | None) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – The table read.

When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data.

For other formats, the argument value_column is required and defines the columns.

Example:

Consider that there’s a Kafka queue running locally on the port 9092 and we need to read raw messages from the topic “test-topic”. Then, it can be done in the following way:

import pathway as pw
t = pw.io.kafka.simple_read("localhost:9092", "test-topic")

pw.io.kafka.write(table, rdkafka_settings, topic_name, *, format='json', delimiter=',', **kwargs)

sourceWrite a table to a given topic on a Kafka instance.

  • Parameters
    • table (Table) – the table to output.
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_name (str) – name of topic in Kafka to which the data should be sent.
    • format (str) – format of the input data, currently “json” and “dsv” are supported.
    • delimiter (str) – field delimiter to be used in case of delimiter-separated values format.
  • Returns
    None

Limitations:

For future proofing, the format is configurable, but (for now) only JSON is available.

Example:

Consider there is a queue in Kafka, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

import os
rdkafka_settings = {
   "bootstrap.servers": "localhost:9092",
   "security.protocol": "sasl_ssl",
   "sasl.mechanism": "SCRAM-SHA-256",
   "sasl.username": os.environ["KAFKA_USERNAME"],
   "sasl.password": os.environ["KAFKA_PASSWORD"]
}

You want to send a Pathway table t to the Kafka instance.

import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice dog \n 2 9 Bob cat \n 3 8 Alice cat")

To connect to the topic “animals” and send messages, the connector must be used as follows, depending on the format:

JSON version:

pw.io.kafka.write(
   t,
   rdkafka_settings,
   "animals",
   format="json",
)

All the updates of table t will be sent to the Kafka instance.