pw.io.redpanda
read(rdkafka_settings, topic=None, *, schema=None, format='raw', debug_data=None, autocommit_duration_ms=1500, json_field_paths=None, parallel_readers=None, name=None, **kwargs)
sourceReads table from a set of topics in Redpanda. There are three formats currently supported: “raw”, “csv”, and “json”.
- Parameters
- rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic (
str
|list
[str
] |None
) – Name of topic in Redpanda from which the data should be read. - schema (
type
[Schema
] |None
) – Schema of the resulting table. - format (
str
) – format of the input data, “raw”, “csv”, or “json” - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
int
|None
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - json_field_paths (
dict
[str
,str
] |None
) – If the format is JSON, this field allows to map field names into path in the field. For the field which require such mapping, it should be given in the format <field_name>:, where the path to be mapped needs to be a JSON Pointer (RFC 6901). - parallel_readers (
int
|None
) – number of copies of the reader to work in parallel. In case the number is not specified, min{pathway_threads, total number of partitions} will be taken. This number also can’t be greater than the number of Pathway engine threads, and will be reduced to the number of engine threads, if it exceeds. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
- rdkafka_settings (
- Returns
Table – The table read.
When using the format “raw”, the connector will produce a single-column table: all the data is saved into a column named data. For other formats, the argument value_column is required and defines the columns.
Example:
Consider a simple instance of Redpanda without authentication. Settings for rdkafka will look as follows:
import os
rdkafka_settings = {
"bootstrap.servers": "localhost:9092",
"security.protocol": "plaintext",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "60000"
}
To connect to the topic “animals” and accept messages, the connector must be used as follows, depending on the format:
Raw version:
import pathway as pw
t = pw.io.redpanda.read(
rdkafka_settings,
topic="animals",
format="raw",
)
All the data will be accessible in the column data.
CSV version:
import pathway as pw
class InputSchema(pw.Schema):
owner: str
pet: str
t = pw.io.redpanda.read(
rdkafka_settings,
topic="animals",
format="csv",
schema=InputSchema,
)
In case of CSV format, the first message must be the header:
owner,pet
Then, simple data rows are expected. For example:
Alice,cat
Bob,dog
This way, you get a table which looks as follows:
pw.debug.compute_and_print(t, include_id=False)
JSON version:
import pathway as pw
t = pw.io.redpanda.read(
rdkafka_settings,
topic="animals",
format="json",
schema=InputSchema,
)
For the JSON connector, you can send these two messages:
{"owner": "Alice", "pet": "cat"}
{"owner": "Bob", "pet": "dog"}
This way, you get a table which looks as follows:
pw.debug.compute_and_print(t, include_id=False)
Now consider that the data about pets come in a more sophisticated way. For instance you have an owner, kind and name of an animal, along with some physical measurements.
The JSON payload in this case may look as follows:
{
"name": "Jack",
"pet": {
"animal": "cat",
"name": "Bob",
"measurements": [100, 200, 300]
}
}
Suppose you need to extract a name of the pet and the height, which is the 2nd (1-based) or the 1st (0-based) element in the array of measurements. Then, you use JSON Pointer and do a connector, which gets the data as follows:
import pathway as pw
class InputSchema(pw.Schema):
pet_name: str
pet_height: int
t = pw.io.redpanda.read(
rdkafka_settings,
topic="animals",
format="json",
schema=InputSchema,
json_field_paths={
"pet_name": "/pet/name",
"pet_height": "/pet/measurements/1"
},
)
write(table, rdkafka_settings, topic_name, *, format='json', name=None, sort_by=None, **kwargs)
sourceWrite a table to a given topic on a Redpanda instance.
- Parameters
- table (
Table
) – the table to output. - rdkafka_settings (
dict
) – Connection settings in the format of librdkafka. - topic_name (
str
) – name of topic in Redpanda to which the data should be sent. - format (
str
) – format of the input data, only “json” is currently supported. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Limitations:
For future proofing, the format is configurable, but (for now) only JSON is available.
Example:
Consider there is a queue in Redpanda, running locally on port 9092. Our queue can use SASL-SSL authentication over a SCRAM-SHA-256 mechanism. You can set up a queue with similar parameters in Upstash. Settings for rdkafka will look as follows:
import os
rdkafka_settings = {
"bootstrap.servers": "localhost:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": os.environ["KAFKA_USERNAME"],
"sasl.password": os.environ["KAFKA_PASSWORD"]
}
You want to send a Pathway table t to the Redpanda instance.
import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice dog \n 2 9 Bob cat \n 3 8 Alice cat")
To connect to the topic “animals” and send messages, the connector must be used as follows, depending on the format:
JSON version:
pw.io.redpanda.write(
t,
rdkafka_settings,
"animals",
format="json",
)
All the updates of table t will be sent to the Redpanda instance.