Pathway provides connectors to read and send messages from a Kafka instance.
Kafka connectors only work in the streaming mode.
⚠️ Note there also exist connectors for Redpanda. They work the same; you only need to replace kafka
with redpanda
:
pw.io.redpanda.read
and pw.io.redpanda.write
.
Consider a simple scenario: messages are sent to a Kafka instance on a topic connector_example
, each message containing a table with a single column value
in a CSV format, and we want to compute the sum of these values and send the resulting output stream to the same Kafka instance on a sum
topic.
You can do it as follows in Pathway:
import pathway as pw
# Kafka settings
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
# We define a schema for the table
# It set all the columns and their types
class InputSchema(pw.Schema):
value: int
# We use the Kafka connector to listen to the "connector_example" topic
t = pw.io.kafka.read(
rdkafka_settings,
topic="connector_example",
schema=InputSchema,
format="csv",
autocommit_duration_ms=1000
)
# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))
# We use the Kafka connector to send the resulting output stream containing the sum
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
# We launch the computation.
pw.run()
Data stream: Consider a stream in the form of Kafka messages received on given topics. An update is a set of messages: the update is triggered by a commit. Commits ensure the atomicity of each update and are generated periodically.
Usage:
the Kafka input connector pw.io.kafka.read
takes several arguments:
rdkafka_settings
: the settings used to connect to Kafka; they follow the format of librdkafka.topic
: the topic which is listened to.format
: format of messages among raw
, csv
, and json
.schema
: if the format is not raw
, the schema of the table. It defines the columns names and their types. It also defines the primary keys.autocommit_duration_ms
: the maximum time between two commits. Every autocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway's computation graph.class InputSchema(pw.Schema):
value: int
t = pw.io.kafka.read(
rdkafka_settings,
topic="connector_example",
format="csv",
schema=InputSchema,
autocommit_duration_ms=1000
)
The way the input connector behaves depends on the format of the input data.
raw
: for raw data, there is only one column data
in which all the entry is dumped.csv
and json
: the data is formatted according to the pattern. In this case, value_columns
is expected.⚠️ For the csv
format: the first message should start with a header containing the column names, in the correct order, and separated by commas.
The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.
The output connector sends the updates made to a table t
to a given Kafka instance and on a single Kafka topic.
Currently, messages are sent following the JSON format.
Usage: the output connector takes the following arguments:
table
: the Pathway table to send to Kafka,rdkafka_settings
: the settings used to connect to Kafka; they follow the format of librdkafka.topic_name
: topic on which the messages are sent,format
: currently, only json
is supported (more are coming),commit_frequency_in_messages
: maximum number of commits contained in a single message sent to the Kafka instance.
Note that too large messages are automatically sent, so some messages may contain less than commit_frequency_in_messages
commits.pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
Let's go back to our example on how to compute a sum over the values of the columns value
received on a Kafka topic connector_example
in a CSV format.
The final version of our project contains two files: realtime_sum.py
which processes the stream using Pathway and generating_stream.sh
which generates the streams.
Here is realtime_sum.py
:
import pathway as pw
# Kafka settings
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
class InputSchema(pw.Schema):
value: int
# We use the Kafka connector to listen to the "connector_example" topic
t = pw.io.kafka.read(
rdkafka_settings,
topic="connector_example",
format="csv",
schema=InputSchema,
autocommit_duration_ms=1000
)
# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))
# We use the Kafka connector to send the resulting output stream containing the sum
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
# We launch the computation.
pw.run()
Don't forget the pw.run()
otherwise no computation will be done!
Once pw.run()
is called, the computation will be run forever until it gets killed.
If you need some reminder on Pathway operations, don't hesitate to take a look at our First-steps guide.
You can use the KafkaProducer API provided by Kafka to send messages to Kafka using python in a generating_kafka_stream.py
script:
from kafka import KafkaProducer
import time
topic = "connector_example"
producer = KafkaProducer(
bootstrap_servers=["server-address:9092"],
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
sasl_plain_username="username",
sasl_plain_password="********",
)
producer.send(topic, ("value").encode("utf-8"), partition=0)
time.sleep(5)
for i in range(10):
time.sleep(1)
producer.send(
topic, (str(i)).encode("utf-8"), partition=0
)
producer.close()
Note that, depending on your version of Kafka, you may need to specify the API version to make this code work:
api_version=(0,10,2)
:
producer = KafkaProducer(
bootstrap_servers=["server-address:9092"],
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
sasl_plain_username="username",
sasl_plain_password="********",
api_version=(0,10,2),
)