TEST

pw.io.redpanda

pw.io.redpanda.read(rdkafka_settings, topic=None, *, schema=None, format='raw', debug_data=None, autocommit_duration_ms=1500, json_field_paths=None, parallel_readers=None, persistent_id=None, value_columns=None, primary_key=None, types=None, default_values=None, **kwargs)

sourceReads table from a set of topics in Redpanda. There are three formats currently supported: “raw”, “csv”, and “json”.

  • Parameters
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic (str | list[str] | None) – Name of topic in Redpanda from which the data should be read.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • format (str) – format of the input data, “raw”, “csv”, or “json”
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (int | None) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • json_field_paths (dict[str, str] | None) – If the format is JSON, this field allows to map field names into path in the field. For the field which require such mapping, it should be given in the format <field_name>: , where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • parallel_readers (int | None) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • value_columns (list[str] | None) – Columns to extract for a table, required for format other than “raw”. [will be deprecated soon]
    • primary_key (list[str] | None) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated as uuid4. [will be deprecated soon]
    • types (dict[str, PathwayType] | None) – Dictionary containing the mapping between the columns and the data types (pw.Type) of the values of those columns. This parameter is optional, and if not provided the default type is pw.Type.ANY. [will be deprecated soon]
    • default_values (dict[str, Any] | None) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value. [will be deprecated soon]
  • Returns
    Table – The table read.

When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data. For other formats, the argument value_column is required and defines the columns.

Example:

Consider a simple instance of Redpanda without authentication. Settings for rdkafka will look as follows:

import os
rdkafka_settings = {
   "bootstrap.servers": "localhost:9092",
   "security.protocol": "plaintext",
   "group.id": "$GROUP_NAME",
   "session.timeout.ms": "60000"
}

To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:

Raw version:

import pathway as pw
t = pw.io.redpanda.read(
   rdkafka_settings,
   topic="animals",
   format="raw",
)

All the data will be accessible in the column data.

CSV version:

import pathway as pw

class InputSchema(pw.Schema):
  owner: str
  pet: str

t = pw.io.redpanda.read(
    rdkafka_settings,
    topic="animals",
    format="csv",
    schema=InputSchema,
)

In case of CSV format, the first message must be the header:

owner,pet

Then, simple data rows are expected. For example:

Alice,cat
Bob,dog

This way, you get a table which looks as follows:

pw.debug.compute_and_print(t, include_id=False)

JSON version:

import pathway as pw
t = pw.io.redpanda.read(
    rdkafka_settings,
    topic="animals",
    format="json",
    schema=InputSchema,
)

For the JSON connector, you can send these two messages:

{"owner": "Alice", "pet": "cat"}
{"owner": "Bob", "pet": "dog"}

This way, you get a table which looks as follows:

pw.debug.compute_and_print(t, include_id=False)

Now consider that the data about pets come in a more sophisticated way. For instance you have an owner, kind and name of an animal, along with some physical measurements.

The JSON payload in this case may look as follows:

{
    "name": "Jack",
    "pet": {
        "animal": "cat",
        "name": "Bob",
        "measurements": [100, 200, 300]
    }
}

Suppose you need to extract a name of the pet and the height, which is the 2nd (1-based) or the 1st (0-based) element in the array of measurements. Then, you use JSON Pointer and do a connector, which gets the data as follows:

import pathway as pw
class InputSchema(pw.Schema):
   pet_name: str
   pet_height: int
t = pw.io.redpanda.read(
   rdkafka_settings,
   topic="animals",
   format="json",
   schema=InputSchema,
   json_field_paths={
       "pet_name": "/pet/name",
       "pet_height": "/pet/measurements/1"
   },
)

pw.io.redpanda.write(table, rdkafka_settings, topic_name, *, format='json', **kwargs)

sourceWrite a table to a given topic on a Redpanda instance.

  • Parameters
    • table (Table) – the table to output.
    • rdkafka_settings (dict) – Connection settings in the format of librdkafka.
    • topic_name (str) – name of topic in Redpanda to which the data should be sent.
    • format (str) – format of the input data, only “json” is currently supported.
  • Returns
    None

Limitations:

For future proofing, the format is configurable, but (for now) only JSON is available.

Example:

Consider there is a queue in Redpanda, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:

import os
rdkafka_settings = {
   "bootstrap.servers": "localhost:9092",
   "security.protocol": "sasl_ssl",
   "sasl.mechanism": "SCRAM-SHA-256",
  "sasl.username": os.environ["KAFKA_USERNAME"],
   "sasl.password": os.environ["KAFKA_PASSWORD"]
}

You want to send a Pathway table t to the Redpanda instance.

import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice dog \n 2 9 Bob cat \n 3 8 Alice cat")

To connect to the topic “animals” and send messages, the connector must be used as follows, depending on the format:

JSON version:

pw.io.redpanda.write(
   t,
   rdkafka_settings,
   "animals",
   format="json",
)

All the updates of table t will be sent to the Redpanda instance.