Using Kafka connectors

Pathway provides a pw.io.kafka module with connectors to read and send messages from a Kafka instance.

In Pathway, you can read and send messages to a Kafka topic using pw.io.kafka.read and pw.io.kafka.write. Kafka connectors only work in the streaming mode.

⚠️ Note there also exist connectors for Redpanda. They work the same; you only need to replace kafka with redpanda: pw.io.redpanda.read and pw.io.redpanda.write.

Short version

Consider a simple scenario: messages are sent to a Kafka instance on a topic connector_example, each message containing a table with a single column value in a CSV format, and we want to compute the sum of these values and send the resulting output stream to the same Kafka instance on a sum topic. You can do it as follows in Pathway:

realtime_sum.py
import pathway as pw

# Kafka settings
rdkafka_settings = {
    "bootstrap.servers": "server-address:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********",
}

# We define a schema for the table
# It set all the columns and their types
class InputSchema(pw.Schema):
  value: int


# We use the Kafka connector to listen to the "connector_example" topic
t = pw.io.kafka.read(
    rdkafka_settings,
    topic="connector_example",
    schema=InputSchema,
    format="csv",
    autocommit_duration_ms=1000
)

# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# We use the Kafka connector to send the resulting output stream containing the sum
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")

# We launch the computation.
pw.run()

Input connector

Data stream: Consider a stream in the form of Kafka messages received on given topics. An update is a set of messages: the update is triggered by a commit. Commits ensure the atomicity of each update and are generated periodically.

Usage: the Kafka input connector pw.io.kafka.read takes several arguments:

  • rdkafka_settings: the settings used to connect to Kafka; they follow the format of librdkafka.
  • topic: the topic which is listened to.
  • format: format of messages among raw, csv, and json.
  • schema: if the format is not raw, the schema of the table. It defines the columns' names and their types. It also defines the primary keys.
  • autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway's computation graph.
class InputSchema(pw.Schema):
  value: int


t = pw.io.kafka.read(
    rdkafka_settings,
    topic="connector_example",
    format="csv",
    schema=InputSchema,
    autocommit_duration_ms=1000
)

The way the input connector behaves depends on the format of the input data.

  • raw: for raw data, there is only one column data in which all the entry is dumped.
  • csv and json: the data is formatted according to the pattern. In this case, value_columns is expected.

⚠️ For the csv format: the first message should start with a header containing the column names, in the correct order, and separated by commas. The connector will not properly work without this message, however, it must be sent only once: if sent twice, the second message will be treated like a normal row.

Output connector

The output connector pw.io.kafka.write sends the updates made to a table t to a given Kafka instance and on a single Kafka topic. Currently, messages are sent following the JSON format.

Usage: the output connector takes the following arguments:

  • table: the Pathway table to send to Kafka,
  • rdkafka_settings: the settings used to connect to Kafka; they follow the format of librdkafka.
  • topic_name: topic on which the messages are sent,
  • format: binary, json, and dsv (a generalization of CSV) are currently supported (more are coming).
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")

Complete example

Let's go back to our example on how to compute a sum over the values of the columns' value received on a Kafka topic connector_example in a CSV format. The final version of our project contains two files: realtime_sum.py which processes the stream using Pathway and generating_stream.sh which generates the streams.

Here is realtime_sum.py:

realtime_sum.py
import pathway as pw

# Kafka settings
rdkafka_settings = {
    "bootstrap.servers": "server-address:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********",
}
class InputSchema(pw.Schema):
  value: int


# We use the Kafka connector to listen to the "connector_example" topic
t = pw.io.kafka.read(
    rdkafka_settings,
    topic="connector_example",
    format="csv",
    schema=InputSchema,
    autocommit_duration_ms=1000
)

# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# We use the Kafka connector to send the resulting output stream containing the sum
pw.io.kafka.write(t, rdkafka_settings, topic_name="sum", format="json")

# We launch the computation.
pw.run()

Don't forget the pw.run() otherwise no computation will be done! Once pw.run() is called, the computation will be run forever until it gets killed. If you need some reminders on Pathway operations, don't hesitate to take a look at our First-steps guide.

You can use the KafkaProducer API provided by Kafka to send messages to Kafka using Python in a generating_kafka_stream.py script:

generating_kafka_stream.py
from kafka import KafkaProducer
import time

topic = "connector_example"

producer = KafkaProducer(
    bootstrap_servers=["server-address:9092"],
    sasl_mechanism="SCRAM-SHA-256",
    security_protocol="SASL_SSL",
    sasl_plain_username="username",
    sasl_plain_password="********",
)
producer.send(topic, ("value").encode("utf-8"), partition=0)

time.sleep(5)
for i in range(10):
    time.sleep(1)
    producer.send(
        topic, (str(i)).encode("utf-8"), partition=0
    )

producer.close()

Note that, depending on your version of Kafka, you may need to specify the API version to make this code work: api_version=(0,10,2):

producer = KafkaProducer(
    bootstrap_servers=["server-address:9092"],
    sasl_mechanism="SCRAM-SHA-256",
    security_protocol="SASL_SSL",
    sasl_plain_username="username",
    sasl_plain_password="********",
    api_version=(0,10,2),
)
connectorKafkaDocker