Using Kafka connectors
Pathway provides a pw.io.kafka
module with connectors to read and send messages from a Kafka instance.
In Pathway, you can read and send messages to a Kafka topic using pw.io.kafka.read
and pw.io.kafka.write
.
Kafka connectors only work in the streaming mode.
⚠️ Note there also exist connectors for Redpanda. They work the same; you only need to replace kafka
with redpanda
:
pw.io.redpanda.read
and pw.io.redpanda.write
.
Short version
Consider a simple scenario: messages are sent to a Kafka instance on a topic connector_example
, each message containing a table with a single column value
in a CSV format, and we want to compute the sum of these values and send the resulting output stream to the same Kafka instance on a sum
topic.
You can do it as follows in Pathway:
import pathway as pw
# Kafka settings
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
# We define a schema for the table
# It set all the columns and their types
class InputSchema(pw.Schema):
value: int
# We use the Kafka connector to listen to the "connector_example" topic
t = pw.io.kafka.read(
rdkafka_settings,
topic="connector_example",
schema=InputSchema,
format="csv",
autocommit_duration_ms=1000
)
# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))
# We use the Kafka connector to send the resulting output stream containing the sum
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
# We launch the computation.
pw.run()
Input connector
Data stream: Consider a stream in the form of Kafka messages received on given topics. An update is a set of messages: the update is triggered by a commit. Commits ensure the atomicity of each update and are generated periodically.
Usage:
the Kafka input connector pw.io.kafka.read
takes several arguments:
rdkafka_settings
: the settings used to connect to Kafka; they follow the format of librdkafka.topic
: the topic which is listened to.format
: format of messages amongraw
,csv
, andjson
.schema
: if the format is notraw
, the schema of the table. It defines the columns' names and their types. It also defines the primary keys.autocommit_duration_ms
: the maximum time between two commits. Everyautocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway's dataflow.
class InputSchema(pw.Schema):
value: int
t = pw.io.kafka.read(
rdkafka_settings,
topic="connector_example",
format="csv",
schema=InputSchema,
autocommit_duration_ms=1000
)
The way the input connector behaves depends on the format of the input data.
raw
: for raw data, there is only one columndata
in which all the entry is dumped.csv
andjson
: the data is formatted according to the pattern. In this case,schema
is expected.
⚠️ For the csv
format: the first message should start with a header containing the column names, in the correct order, and separated by commas.
The connector will not properly work without this message, however, it must be sent only once: if sent twice, the second message will be treated like a normal row.
Output connector
The output connector pw.io.kafka.write
sends the updates made to a table t
to a given Kafka instance and on a single Kafka topic.
Currently, messages are sent following the JSON format.
Usage: the output connector takes the following arguments:
table
: the Pathway table to send to Kafka,rdkafka_settings
: the settings used to connect to Kafka; they follow the format of librdkafka.topic_name
: topic on which the messages are sent,format
:binary
,json
, anddsv
(a generalization of CSV) are currently supported (more are coming).
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
Complete example
Let's go back to our example on how to compute a sum over the values of the columns' value
received on a Kafka topic connector_example
in a CSV format.
The final version of our project contains two files: realtime_sum.py
which processes the stream using Pathway and generate_stream.py
which generates the streams.
Here is realtime_sum.py
:
import pathway as pw
# Kafka settings
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
class InputSchema(pw.Schema):
value: int
# We use the Kafka connector to listen to the "connector_example" topic
t = pw.io.kafka.read(
rdkafka_settings,
topic="connector_example",
format="csv",
schema=InputSchema,
autocommit_duration_ms=1000
)
# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))
# We use the Kafka connector to send the resulting output stream containing the sum
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
# We launch the computation.
pw.run()
Don't forget the pw.run()
otherwise no computation will be done!
Once pw.run()
is called, the computation will be run forever until it gets killed.
If you need some reminders on Pathway operations, don't hesitate to take a look at our First-steps guide.
You can use the KafkaProducer API provided by Kafka to send messages to Kafka using Python in a generate_stream.py
script:
from kafka import KafkaProducer
import time
topic = "connector_example"
producer = KafkaProducer(
bootstrap_servers=["server-address:9092"],
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
sasl_plain_username="username",
sasl_plain_password="********",
)
producer.send(topic, ("value").encode("utf-8"), partition=0)
time.sleep(5)
for i in range(10):
time.sleep(1)
producer.send(
topic, (str(i)).encode("utf-8"), partition=0
)
producer.close()
Note that, depending on your version of Kafka, you may need to specify the API version to make this code work:
api_version=(0,10,2)
:
producer = KafkaProducer(
bootstrap_servers=["server-address:9092"],
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
sasl_plain_username="username",
sasl_plain_password="********",
api_version=(0,10,2),
)