Using Kafka connectors
Pathway provides connectors to read and send messages from a Kafka instance.
Kafka connectors only work in the streaming mode.
Short example
Consider a simple scenario: messages are sent to a Kafka instance on a topic connector_example
, each message containing a table with a single column value
in a CSV format, and we want to compute the sum of these values and send the resulting output stream to the same Kafka instance on a sum
topic.
You can do it as follows in Pathway:
# Kafka settingsrdkafka_settings = { "bootstrap.servers": "server-address:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}# We use the Kafka connector to listen to the "connector_example" topict = pw.kafka.read( rdkafka_settings, topic_names=["connector_example"], value_columns=["value"], format="csv")# We compute the sum (this part is independent of the connectors).t = t.select(value=pw.apply(lambda x: int(x), t.value))t = t.reduce(sum=pw.reducers.sum(t.value))# We use the Kafka connector to send the resulting output stream containing the sumpw.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")# We launch the computation.pw.run()
Input connector
Data stream:
we consider a stream in the form of Kafka messages received on given topics.
An update is a set of messages: the update is triggered by the reception of a message containing *COMMIT*
.
Commit messages ensure the atomicity of each update.
Usage:
the Kafka input connector pw.kafka.read
takes several arguments:
rdkafka_settings
: the settings used to connect to Kafka; they follow the format of librdkafka.topic_names
: the list of the topics which are listened to.format
: format of messages amongraw
,csv
, andjson
.value_columns
: if the format is notraw
, it is the list of the columns which should be expected.primary_key
: in case the table should have primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, primary key will be generated as uuid4.
t = pw.kafka.read( rdkafka_settings, topic_names=["connector_example"], value_columns=["value"], format="csv")
The way the input connector behaves depends on the format of the input data.
raw
: for raw data, there is only one columndata
in which all the entry is dumped.csv
andjson
: the data is formatted according to the pattern. In this case,value_columns
is expected.
⚠️ For the csv
format: the first message should start with a header containing the column names, in the correct order, and separated by commas.
The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.
⚠️ To be taken into account, updates must be followed by a *COMMIT*
message.
Output connector
The output connector send the updates made to a table t
to a given Kafka instance and on a single Kafka topic.
Currently, messages are sent following the JSON format.
Usage: the output connector takes the following arguments:
table
: the Pathway table to send to Kafka,rdkafka_settings
: the settings used to connect to Kafka; they follow the format of librdkafka.topic_name
: topic on which the messages are sent,format
: currently, onlyjson
is supported (more are coming),commit_frequency_in_messages
: maximum number of commits contained in a single message sent to the Kafka instance. Note that too large messages are automatically sent, so some messages may contain less thancommit_frequency_in_messages
commits.
pw.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")
Full example
Let's go back to our example on how to compute a sum over the values of the columns value
received on a Kafka topic connector_example
in a CSV format.
The final version of our project contains two files: realtime_kafka.py
which processes the stream using Pathway and generating_stream.sh
which generates the streams.
Here is realtime_kafka.py
:
# Kafka settingsrdkafka_settings = { "bootstrap.servers": "server-address:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}# We use the Kafka connector to listen to the "connector_example" topict = pw.kafka.read( rdkafka_settings, topic_names=["connector_example"], value_columns=["value"], format="csv")# We compute the sum (this part is independent of the connectors).t = t.select(value=pw.apply(lambda x: int(x), t.value))t = t.reduce(sum=pw.reducers.sum(t.value))# We use the Kafka connector to send the resulting output stream containing the sumpw.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")# We launch the computation.pw.run()
Don't forget the pw.run()
otherwise no computation will be done!
Once pw.run()
is called, the computation will be run forever until it got killed.
If you need some reminder on Pathway operations, don't hesitate to take a look at our survival guide.
As previously said, the Kafka connector expects the first message to contain the names of the columns and updates must be followed by a *COMMIT*
message.
We can use the KafkaProducer API provided by Kafka to send messages to Kafka using python in a generating_kafka_stream.py
script:
from kafka import KafkaProducerimport timetopic = "connector_example"producer = KafkaProducer( bootstrap_servers=["server-address:9092"], sasl_mechanism="SCRAM-SHA-256", security_protocol="SASL_SSL", sasl_plain_username="username", sasl_plain_password="********",)producer.send(topic, ("value").encode("utf-8"), partition=0)time.sleep(5)for i in range(10): time.sleep(1) producer.send( topic, (str(i)).encode("utf-8"), partition=0 ) producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.close()
Note that, depending on your version of Kafka, you may need to specify the API version to make this code works:
api_version=(0,10,2)
:
producer = KafkaProducer( bootstrap_servers=["server-address:9092"], sasl_mechanism="SCRAM-SHA-256", security_protocol="SASL_SSL", sasl_plain_username="username", sasl_plain_password="********", api_version=(0,10,2),)