March 24, 2023TUTORIAL Β· CONNECTORS

Switching from Kafka to Redpanda

Not a fan of the JVM and ZooKeeper? In this article, you will learn how to switch from Kafka to Redpanda and how to adapt your Pathway project to Redpanda. The change is easier than you might think: your Pathway code remains the same!

Thumbnail

When Kafka was first released in 2011, it was a game-changer for realtime data processing. Its innovative distributed streaming platform brought powerful features that allowed organizations to build scalable and fault-tolerant data pipelines, process streams of data in real time, and integrate with various data sources and sinks.

However, times have changed, and the world of data processing and streaming has evolved. Some of Kafka's once cutting-edge features are now seen as hindrances by some developers. For instance, Kafka can be complex to set up and manage, relies on ZooKeeper, and uses a JVM.

This is where Redpanda comes in. Redpanda is "a kafka" rebuilt in C++ as a, high-performance, Kafka-compatible streaming platform designed for modern applications. It offers improved performance, lower latency, and is much more efficient with resource utilization than Kafka. One of the significant benefits of Redpanda is its simplicity, making it easier to deploy and manage, saving time and resources.

Redpanda claims to be fully Kafka compatible but how easy is it really to switch from Kafka to Redpanda? Specifically, we want to know, how much of a lift is it to modify our Pathway Kafka integration?

If you already have your Pathway project deployed and want to switch to an existing Redpanda instance, you can jump directly to the Redpanda section. For both Kafka and Redpanda, we provide the sources of the respective projects.

You can also create a new project from our project template. The creator will ask you to choose between Kafka and Redpanda and will set everything up for you.

Best-rated movies problem

You just have been hired by a trendy VOD online platform. As a new team member, your first task is to identify the most popular movies in the catalog; Specifically, you want to find the K movies with the highest scores and how many ratings those movies have received.

For example, this is what the expected table for K=3 could be:

MovieIDAverageRatingNumber
2184.97510
454.89123
74564.81240

The ratings are received as a data stream through a Kafka instance, and you output the table to a CSV file.

Solution with Kafka

With Kafka, we need the following four components:

  • ZooKeeper
  • Kafka
  • Pathway
  • a stream producer

Each component will be hosted in a different docker container.

Ratings will be sent by the stream producer to Kafka on the topic ratings. Pathway listens to the topic, processes the stream, and outputs the ranking in a CSV file best_rating.csv.

Kafka architecture

Pathway and the stream generator will have their own Dockerfile to install all the required dependencies. The stream will be created by streaming the lines of a static data dataset.csv.

Our project will have the following structure:

.
β”œβ”€β”€ pathway-src/
β”‚   β”œβ”€β”€ Dockerfile
β”‚   └── process-stream.py
β”œβ”€β”€ producer-src/
β”‚   β”œβ”€β”€ create-stream.py
β”‚   β”œβ”€β”€ dataset.csv
β”‚   └── Dockerfile
β”œβ”€β”€ docker-compose.yml
└── Makefile

Kafka and ZooKeeper

Kafka and ZooKeeper are configurable in the docker-compose.yml file. To keep things simple, no security mechanisms are used.

docker-compose.yml
version: "3.7"
name: tuto-switch-to-redpanda
networks:
  tutorial_network:
    driver: bridge
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      - tutorial_network
  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_AUTO_CREATE_TOPICS: true
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CONFLUENT_SUPPORT_METRICS_ENABLE: false
    ports:
    - 9092:9092
    command: sh -c "((sleep 15 && kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic ratings)&) && /etc/confluent/docker/run "
    networks:
      - tutorial_network

Here we are sending the messages to a topic called ratings, created in the command setting.

Generating the stream

To generate the stream, we start with a CSV dataset with the following columns: userId (int), movieId (int), rating (float), and timestamp (int). This is the schema chosen by GroupLens for their MovieLens25M dataset: we provide a toy dataset as an example, but the project will work with the whole MovieLens25M dataset.

To generate the stream, you can use a simple Python script to read the CSV file line by line, and each rating will be sent to Kafka using the kafka-python package.

./producer-src/create-stream.py
import csv
import json
import time

from kafka import KafkaProducer

topic = "ratings"

#We wait for Kafka and ZooKeeper to be ready
time.sleep(30)

producer = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    security_protocol="PLAINTEXT",
    api_version=(0, 10, 2),
)

with open("./dataset.csv", newline="") as csvfile:
    dataset_reader = csv.reader(csvfile, delimiter=",")
    first_line = True
    for row in dataset_reader:
        # We skip the header
        if first_line:
            first_line = False
            continue
        message_json = {
            "userId": int(row[0]),
            "movieId": int(row[1]),
            "rating": float(row[2]),
            "timestamp": int(row[3]),
        }
        producer.send(topic, (json.dumps(message_json)).encode("utf-8"))
        time.sleep(0.1)

producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
producer.close()

Note that we connect to kafka:9092 and not localhost.

This script will have its own container:

docker-compose.yml
  stream-producer:
    build:
      context: .
      dockerfile: ./producer-src/Dockerfile
    depends_on: [kafka]
    networks:
      - tutorial_network

You only need to use a Python image and install the associated package:

./producer-src/Dockerfile
FROM python:3.10

RUN pip install kafka-python
COPY ./producer-src/create-stream.py create-stream.py
COPY ./producer-src/dataset.csv dataset.csv

CMD ["python", "-u", "create-stream.py"]

Pathway

Now you have a stream generated from the dataset and sent to Kafka, so at this point you simply need to connect Pathway to Kafka and process the data. To connect to Kafka, configure the connection:

./pathway-src/process-stream.py
rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
}

If you want to establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism, you can do it as follows:

./pathway-src/process-stream.py
rdkafka_settings = {
    "bootstrap.servers": "server:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********"
}

Let's connect to the ratings topic using a Kafka connector:

./pathway-src/process-stream.py
class InputSchema(pw.Schema):
  movieId: int
  rating: float

t_ratings = pw.io.kafka.read(
    rdkafka_settings,
    topic="ratings",
    format="json",
    schema=InputSchema,
    autocommit_duration_ms=100,
)

You are only interested in the movieId and rating columns, so there is no need to include the others.

You can now define a function to find the best-rated movies:

def compute_best(t_ratings, K):
  t_best_ratings = t_ratings.groupby(pw.this.movieId).reduce(
      pw.this.movieId,
      sum_ratings=pw.reducers.sum(pw.this.rating),
      number_ratings=pw.reducers.count(pw.this.rating),
  )
  t_best_ratings = t_best_ratings.select(
      pw.this.movieId,
      pw.this.number_ratings,
      average_rating=pw.apply(
          lambda x, y: (x / y) if y != 0 else 0,
          pw.this.sum_ratings,
          pw.this.number_ratings,
      ),
  )
  t_best_ratings = t_best_ratings.select(
      movie_tuple=pw.apply(
          lambda x, y, z: (x, y, z),
          pw.this.average_rating,
          pw.this.number_ratings,
          pw.this.movieId,
      )
  )
  t_best_ratings = t_best_ratings.reduce(
      total_tuple=pw.reducers.sorted_tuple(pw.this.movie_tuple)
  )
  t_best_ratings = t_best_ratings.select(
      K_best=pw.apply(lambda my_tuple: (list(my_tuple))[-K:], pw.this.total_tuple)
  )
  t_best_ratings = t_best_ratings.flatten(pw.this.K_best).select(
      pw.this.K_best
  )
  t_best_ratings = t_best_ratings.select(
      movieId=pw.apply(lambda rating_tuple: rating_tuple[2], pw.this.K_best),
      average_rating=pw.apply(lambda rating_tuple: rating_tuple[0], pw.this.K_best),
      views=pw.apply(lambda rating_tuple: rating_tuple[1], pw.this.K_best),
  )
  return t_best_ratings

Using the function, your final file will look like this:

./pathway-src/process-stream.py
import pathway as pw
import time

rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
}

class InputSchema(pw.Schema):
  movieId: int
  rating: float


t_ratings = pw.io.kafka.read(
    rdkafka_settings,
    topic="ratings",
    format="json",
    schema=InputSchema,
    autocommit_duration_ms=100,
)

t_best_ratings = compute_best(t_ratings, 3)

# We output the results in a dedicated CSV file
pw.io.csv.write(t_best_ratings, "./best_ratings.csv")

# We wait for Kafka and ZooKeeper to be ready
time.sleep(20)
# We launch the computation
pw.run()

You can set up a dedicated container:

docker-compose.yml
  pathway:
    build:
      context: .
      dockerfile: ./pathway-src/Dockerfile
    depends_on: [kafka]
    networks:
      - tutorial_network
./pathway-src/Dockerfile
FROM python:3.10

RUN pip install -U pathway
COPY ./pathway-src/process-stream.py process-stream.py

CMD ["python", "-u", "process-stream.py"]

Results

We provide the sources of the Kafka project.

Let's use the following toy dataset:

./producer-src/dataset.csv
userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
2,296,4.0,1147880044
2,306,2.5,1147868817
2,307,3.0,1147868828
2,665,2.0,1147878820
2,899,4.5,1147868510
2,1088,2.0,1147868495
3,296,1.0,1147880044
3,306,2.5,1147868817
3,307,4.0,1147868828
3,665,2.0,1147878820
3,899,1.5,1147868510
3,1088,5.0,1147868495

You obtain the following results:

movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1

As expected, the top 3 get updated whenever the ranking changes due to a new rating.

Switching to Redpanda

Congratulations! You can now find the K best-rated movies on your VOD platform; However, your team has discovered a new alternative to Kafka: Redpanda. It is totally Kafka-compatible, does not rely on ZooKeeper, and much easier to manage, is more durable (meaning no data loss, as there is no page cache) and according to their published benchmarks is faster than Kafka! Suppose your team is excited about this and proposes your next task: switch from Kafka to Redpanda.

With Redpanda, the project is simpler, all you have is:

  • Redpanda
  • Pathway
  • the stream producer

Kafka architecture

Let's see how to deploy Redpanda in docker and how it impacts your project.

Docker

(You can skip this section if you already have an existing Redpanda instance)

First, remove the two services kafka and zookeeper, and replace them with a redpanda service:

./docker-compose.yml
services:
  redpanda:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      - --rpc-addr redpanda:33145
      - --advertise-rpc-addr redpanda:33145
      - --smp 1
      - --memory 1G
      - --mode dev-container
      - --default-log-level=debug
      - --set redpanda.enable_transactions=true
      - --set redpanda.enable_idempotence=true
      - --set redpanda.auto_create_topics_enabled=true
    image: docker.redpanda.com/redpandadata/redpanda:v23.1.2
    container_name: redpanda
    volumes:
      - redpanda:/var/lib/redpanda/data
    networks:
      - tutorial_network

You must now connect Pathway and the stream producer to redpanda instead of kafka. This could have been avoided by naming the Kafka container differently or by naming the Redpanda container kafka.

Pathway

As previously mentioned, we need to update the server's address in the settings:

./pathway-src/process-stream.py
producer = KafkaProducer(
    bootstrap_servers=["redpanda:9092"],
    security_protocol="PLAINTEXT",
    api_version=(0, 10, 2),
)

This is it! The setting will work exactly the same as with Kafka; However, for consistency, there are also dedicated Redpanda connectors. There is no difference between Kafka and Redpanda connectors, as the same connector is used under the hood.

With the Redpanda connector, here is what your ./pathway-src/process-stream.py file looks like:

./pathway-src/process-stream.py
import pathway as pw
import time

rdkafka_settings = {
    "bootstrap.servers": "redpanda:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
}

class InputSchema(pw.Schema):
  movieId: int
  rating: float


t_ratings = pw.io.redpanda.read(
    rdkafka_settings,
    topic="ratings",
    format="json",
    schema=InputSchema,
    autocommit_duration_ms=100,
)

t_best_ratings = compute_best(t_ratings, 3)

# We output the results in a dedicated CSV file
pw.io.csv.write(t_best_ratings, "./best_ratings.csv")

# We wait for Kafka and ZooKeeper to be ready
time.sleep(20)
# We launch the computation
pw.run()

If you don't care about the names of the connector and server, you don't have to change the file at all.

The stream generator

As with Pathway, you need to update the server name (if required) for the stream generator:

./producer-src/create-stream.py
producer = KafkaProducer(
    bootstrap_servers=["redpanda:9092"],
    security_protocol="PLAINTEXT",
    api_version=(0, 10, 2),
)

Small problem, if you look at the results now, we have an empty best_ratings.csv.

This comes from the creation of the ratings topic. While the topic was already ready with Kafka, it was created at the reception of the first message with Redpanda. Creating a topic makes Redpanda discard the messages until it is ready. Sending a message at the beginning of the computation should solve this:

./producer-src/create-stream.py
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)

Note that this difference comes from Redpanda and not Pathway. Pathway connects to Redpanda and Kafka totally transparently. Pathway will receive and process the data the same way whether Kafka or Redpanda is used. Kafka and Redpanda are responsible for handling messages: Redpanda discards the incoming messages while the topic is created.

Despite the fix, the final file is very similar to the one for the Kafka version:

./producer-src/create-stream.py
from kafka import KafkaProducer
import csv
import time
import json

topic = "ratings"

#We wait for Kafka and ZooKeeper to be ready
time.sleep(30)

producer = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    security_protocol="PLAINTEXT",
    api_version=(0, 10, 2),
)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)

with open("./dataset.csv", newline="") as csvfile:
    dataset_reader = csv.reader(csvfile, delimiter=",")
    first_line = True
    for row in dataset_reader:
        # We skip the header
        if first_line:
            first_line = False
            continue
        message_json = {
            "userId": int(row[0]),
            "movieId": int(row[1]),
            "rating": float(row[2]),
            "timestamp": int(row[3]),
        }
        producer.send(topic, (json.dumps(message_json)).encode("utf-8"))
        time.sleep(0.1)

producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
producer.close()

You can also take a look at the sources of the Redpanda project.

With this, the results are the same as with Kafka:

movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1

Bonus: sending your results to Redpanda

You've successfully computed the K best-rated movies using Redpanda, and your ranking is automatically updated thanks to Pathway; However, your team still isn't satisfied with the outcome. After taking a closer look, you realize the issue: you've been sending your results in a CSV file, which isn't the most suitable for handling a data stream. Not only that, but the file stays on your local computer, preventing others in the organization from accessing the data in real time.

Your team suggests sending the results back to Redpanda into a best_ratings topic instead. Redpanda is optimized for handling data streams, making it a more efficient and effective solution than sending data in a CSV file. By doing this, you can ensure the data is accessible to everyone who needs it in real time.

Connecting to Redpanda with Pathway is as easy as connecting to Kafka. You need to use a Redpanda connector, which is exactly the same as the Kafka connector:

rdkafka_settings = {
  "bootstrap.servers": "redpanda:9092",
  "security.protocol": "plaintext",
  "group.id": "$GROUP_NAME",
  "session.timeout.ms": "6000",
}
pw.io.redpanda.write(
  t_best_ratings,
  rdkafka_settings,
  topic_name="best_ratings",
  format="json"
)

As previously mentioned, you can also establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism as (e.g. connecting to Redpanda Cloud) follows:

rdkafka_settings = {
  "bootstrap.servers": "redpanda:9092",
  "security.protocol": "sasl_ssl",
  "sasl.mechanism": "SCRAM-SHA-256",
  "group.id": "$GROUP_NAME",
  "session.timeout.ms": "6000",
  "sasl.username": "username",
  "sasl.password": "********",
}

Conclusions

Congratulations! πŸŽ‰ You have built a K-best-rated application in Pathway and made it work with Redpanda. Being fully Kafka API-compatible, Redpanda just works: if the server name remains the same, you have nothing to do with your Pathway code! For consistency, you can use Pathway's Redpanda connectors which work exactly the same as Kafka connectors.

Olivier Ruas

Algorithm and Data Processing Magician

connectorRedpandaKafkaZookeeperDocker