Using Kafka connectors

Pathway provides connectors to read and send messages from a Kafka instance.

Kafka connectors only work in the streaming mode.

Short example

Consider a simple scenario: messages are sent to a Kafka instance on a topic connector_example, each message containing a table with a single column value in a CSV format, and we want to compute the sum of these values and send the resulting output stream to the same Kafka instance on a sum topic. You can do it as follows in Pathway:

# Kafka settingsrdkafka_settings = {    "bootstrap.servers": "server-address:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "group.id": "$GROUP_NAME",    "session.timeout.ms": "6000",    "sasl.username": "username",    "sasl.password": "********",}# We use the Kafka connector to listen to the "connector_example" topict = pw.kafka.read(    rdkafka_settings,    topic_names=["connector_example"],    value_columns=["value"],    format="csv")# We compute the sum (this part is independent of the connectors).t = t.select(value=pw.apply(lambda x: int(x), t.value))t = t.reduce(sum=pw.reducers.sum(t.value))# We use the Kafka connector to send the resulting output stream containing the sumpw.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")# We launch the computation.pw.run()

Input connector

Data stream: we consider a stream in the form of Kafka messages received on given topics. An update is a set of messages: the update is triggered by the reception of a message containing *COMMIT*. Commit messages ensure the atomicity of each update.

Usage: the Kafka input connector pw.kafka.read takes several arguments:

  • rdkafka_settings: the settings used to connect to Kafka; they follow the format of librdkafka.
  • topic_names: the list of the topics which are listened to.
  • format: format of messages among raw, csv, and json.
  • value_columns: if the format is not raw, it is the list of the columns which should be expected.
  • primary_key: in case the table should have primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, primary key will be generated as uuid4.
t = pw.kafka.read(    rdkafka_settings,    topic_names=["connector_example"],    value_columns=["value"],    format="csv")

The way the input connector behaves depends on the format of the input data.

  • raw: for raw data, there is only one column data in which all the entry is dumped.
  • csv and json: the data is formatted according to the pattern. In this case, value_columns is expected.

⚠️ For the csv format: the first message should start with a header containing the column names, in the correct order, and separated by commas. The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.

⚠️ To be taken into account, updates must be followed by a *COMMIT* message.

Output connector

The output connector send the updates made to a table t to a given Kafka instance and on a single Kafka topic. Currently, messages are sent following the JSON format.

Usage: the output connector takes the following arguments:

  • table: the Pathway table to send to Kafka,
  • rdkafka_settings: the settings used to connect to Kafka; they follow the format of librdkafka.
  • topic_name: topic on which the messages are sent,
  • format: currently, only json is supported (more are coming),
  • commit_frequency_in_messages: maximum number of commits contained in a single message sent to the Kafka instance. Note that too large messages are automatically sent, so some messages may contain less than commit_frequency_in_messages commits.
pw.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")

Full example

Let's go back to our example on how to compute a sum over the values of the columns value received on a Kafka topic connector_example in a CSV format. The final version of our project contains two files: realtime_kafka.py which processes the stream using Pathway and generating_stream.sh which generates the streams.

Here is realtime_kafka.py:

# Kafka settingsrdkafka_settings = {    "bootstrap.servers": "server-address:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "group.id": "$GROUP_NAME",    "session.timeout.ms": "6000",    "sasl.username": "username",    "sasl.password": "********",}# We use the Kafka connector to listen to the "connector_example" topict = pw.kafka.read(    rdkafka_settings,    topic_names=["connector_example"],    value_columns=["value"],    format="csv")# We compute the sum (this part is independent of the connectors).t = t.select(value=pw.apply(lambda x: int(x), t.value))t = t.reduce(sum=pw.reducers.sum(t.value))# We use the Kafka connector to send the resulting output stream containing the sumpw.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")# We launch the computation.pw.run()

Don't forget the pw.run() otherwise no computation will be done! Once pw.run() is called, the computation will be run forever until it got killed. If you need some reminder on Pathway operations, don't hesitate to take a look at our survival guide.

As previously said, the Kafka connector expects the first message to contain the names of the columns and updates must be followed by a *COMMIT* message.

We can use the KafkaProducer API provided by Kafka to send messages to Kafka using python in a generating_kafka_stream.py script:

from kafka import KafkaProducerimport timetopic = "connector_example"producer = KafkaProducer(    bootstrap_servers=["server-address:9092"],    sasl_mechanism="SCRAM-SHA-256",    security_protocol="SASL_SSL",    sasl_plain_username="username",    sasl_plain_password="********",)producer.send(topic, ("value").encode("utf-8"), partition=0)time.sleep(5)for i in range(10):    time.sleep(1)    producer.send(        topic, (str(i)).encode("utf-8"), partition=0    )    producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.close()

Note that, depending on your version of Kafka, you may need to specify the API version to make this code works: api_version=(0,10,2):

producer = KafkaProducer(    bootstrap_servers=["server-address:9092"],    sasl_mechanism="SCRAM-SHA-256",    security_protocol="SASL_SSL",    sasl_plain_username="username",    sasl_plain_password="********",    api_version=(0,10,2),)

Olivier Ruas

Algorithm and Data Processing Magician