Not a fan of the JVM and ZooKeeper? In this article, you will learn how to switch from Kafka to Redpanda and how to adapt your Pathway project to Redpanda. The change is easier than you might think: your Pathway code remains the same!
When Kafka was first released in 2011, it was a game-changer for realtime data processing. Its innovative distributed streaming platform brought powerful features that allowed organizations to build scalable and fault-tolerant data pipelines, process streams of data in real time, and integrate with various data sources and sinks.
However, times have changed, and the world of data processing and streaming has evolved. Some of Kafka's once cutting-edge features are now seen as hindrances by some developers. For instance, Kafka can be complex to set up and manage, relies on ZooKeeper, and uses a JVM.
This is where Redpanda comes in. Redpanda is "a kafka" rebuilt in C++ as a, high-performance, Kafka-compatible streaming platform designed for modern applications. It offers improved performance, lower latency, and is much more efficient with resource utilization than Kafka. One of the significant benefits of Redpanda is its simplicity, making it easier to deploy and manage, saving time and resources.
Redpanda claims to be fully Kafka compatible but how easy is it really to switch from Kafka to Redpanda? Specifically, we want to know, how much of a lift is it to modify our Pathway Kafka integration?
If you already have your Pathway project deployed and want to switch to an existing Redpanda instance, you can jump directly to the Redpanda section. For both Kafka and Redpanda, we provide the sources of the respective projects.
You can also create a new project from our project template. The creator will ask you to choose between Kafka and Redpanda and will set everything up for you.
You just have been hired by a trendy VOD online platform. As a new team member, your first task is to identify the most popular movies in the catalog; Specifically, you want to find the K movies with the highest scores and how many ratings those movies have received.
For example, this is what the expected table for K=3 could be:
MovieID | Average | RatingNumber | |
---|---|---|---|
218 | 4.9 | 7510 | |
45 | 4.8 | 9123 | |
7456 | 4.8 | 1240 |
The ratings are received as a data stream through a Kafka instance, and you output the table to a CSV file.
With Kafka, we need the following four components:
Each component will be hosted in a different docker container.
Ratings will be sent by the stream producer to Kafka on the topic ratings
.
Pathway listens to the topic, processes the stream, and outputs the ranking in a CSV file best_rating.csv
.
Pathway and the stream generator will have their own Dockerfile to install all the required dependencies.
The stream will be created by streaming the lines of a static data dataset.csv
.
Our project will have the following structure:
.
βββ pathway-src/
β βββ Dockerfile
β βββ process-stream.py
βββ producer-src/
β βββ create-stream.py
β βββ dataset.csv
β βββ Dockerfile
βββ docker-compose.yml
βββ Makefile
Kafka and ZooKeeper are configurable in the docker-compose.yml
file.
To keep things simple, no security mechanisms are used.
version: "3.7"name: tuto-switch-to-redpandanetworks: tutorial_network: driver: bridgeservices: zookeeper: image: confluentinc/cp-zookeeper:5.5.3 environment: ZOOKEEPER_CLIENT_PORT: 2181 networks: - tutorial_network kafka: image: confluentinc/cp-enterprise-kafka:5.5.3 depends_on: [zookeeper] environment: KAFKA_AUTO_CREATE_TOPICS: true KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9991 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT CONFLUENT_SUPPORT_METRICS_ENABLE: false ports: - 9092:9092 command: sh -c "((sleep 15 && kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic ratings)&) && /etc/confluent/docker/run " networks: - tutorial_network
Here we are sending the messages to a topic called ratings
, created in the command
setting.
To generate the stream, we start with a CSV dataset with the following columns: userId
(int), movieId
(int), rating
(float), and timestamp
(int).
This is the schema chosen by GroupLens for their MovieLens25M dataset:
we provide a toy dataset as an example, but the project will work with the whole MovieLens25M dataset.
To generate the stream, you can use a simple Python script to read the CSV file line by line, and each rating will be sent to Kafka using the kafka-python
package.
import csvimport jsonimport timefrom kafka import KafkaProducertopic = "ratings"#We wait for Kafka and ZooKeeper to be readytime.sleep(30)producer = KafkaProducer( bootstrap_servers=["kafka:9092"], security_protocol="PLAINTEXT", api_version=(0, 10, 2),)with open("./dataset.csv", newline="") as csvfile: dataset_reader = csv.reader(csvfile, delimiter=",") first_line = True for row in dataset_reader: # We skip the header if first_line: first_line = False continue message_json = { "userId": int(row[0]), "movieId": int(row[1]), "rating": float(row[2]), "timestamp": int(row[3]), } producer.send(topic, (json.dumps(message_json)).encode("utf-8")) time.sleep(0.1)producer.send(topic, "*COMMIT*".encode("utf-8"))time.sleep(2)producer.close()
Note that we connect to kafka:9092
and not localhost.
This script will have its own container:
stream-producer: build: context: . dockerfile: ./producer-src/Dockerfile depends_on: [kafka] networks: - tutorial_network
You only need to use a Python image and install the associated package:
FROM python:3.10RUN pip install kafka-pythonCOPY ./producer-src/create-stream.py create-stream.pyCOPY ./producer-src/dataset.csv dataset.csvCMD ["python", "-u", "create-stream.py"]
Now you have a stream generated from the dataset and sent to Kafka, so at this point you simply need to connect Pathway to Kafka and process the data. To connect to Kafka, configure the connection:
rdkafka_settings = { "bootstrap.servers": "kafka:9092", "security.protocol": "plaintext", "group.id": "0", "session.timeout.ms": "6000",}
If you want to establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism, you can do it as follows:
rdkafka_settings = { "bootstrap.servers": "server:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********"}
Let's connect to the ratings
topic using a Kafka connector:
t_ratings = pw.io.kafka.read( rdkafka_settings, topic_names=["ratings"], format="json", value_columns=[ "movieId", "rating", ], types={"movieId": pw.Type.INT, "rating": pw.Type.FLOAT}, autocommit_duration_ms=100,)
You are only interested in the movieId
and rating
columns, so there is no need to include the others.
You can now define a function to find the best-rated movies:
def compute_best(t_ratings, K): t_best_ratings = t_ratings.groupby(pw.this.movieId).reduce( pw.this.movieId, sum_ratings=pw.reducers.sum(pw.this.rating), number_ratings=pw.reducers.count(pw.this.rating), ) t_best_ratings = t_best_ratings.select( pw.this.movieId, pw.this.number_ratings, average_rating=pw.apply( lambda x, y: (x / y) if y != 0 else 0, pw.this.sum_ratings, pw.this.number_ratings, ), ) t_best_ratings = t_best_ratings.select( movie_tuple=pw.apply( lambda x, y, z: (x, y, z), pw.this.average_rating, pw.this.number_ratings, pw.this.movieId, ) ) t_best_ratings = t_best_ratings.reduce( total_tuple=pw.reducers.sorted_tuple(pw.this.movie_tuple) ) t_best_ratings = t_best_ratings.select( K_best=pw.apply(lambda my_tuple: (list(my_tuple))[-K:], pw.this.total_tuple) ) t_best_ratings = t_best_ratings.flatten(pw.this.K_best).select( pw.this.K_best ) t_best_ratings = t_best_ratings.select( movieId=pw.apply(lambda rating_tuple: rating_tuple[2], pw.this.K_best), average_rating=pw.apply(lambda rating_tuple: rating_tuple[0], pw.this.K_best), views=pw.apply(lambda rating_tuple: rating_tuple[1], pw.this.K_best), ) return t_best_ratings
Using the function, your final file will look like this:
import pathway as pwimport timerdkafka_settings = { "bootstrap.servers": "kafka:9092", "security.protocol": "plaintext", "group.id": "0", "session.timeout.ms": "6000",}t_ratings = pw.io.kafka.read( rdkafka_settings, topic_names=["ratings"], format="json", value_columns=[ "movieId", "rating", ], types={"movieId": pw.Type.INT, "rating": pw.Type.FLOAT}, autocommit_duration_ms=100,)t_best_ratings = compute_best(t_ratings, 3)# We output the results in a dedicated CSV filepw.io.csv.write(t_best_ratings, "./best_ratings.csv")# We wait for Kafka and ZooKeeper to be readytime.sleep(20)# We launch the computationpw.run()
You can set up a dedicated container:
pathway: build: context: . dockerfile: ./pathway-src/Dockerfile depends_on: [kafka] networks: - tutorial_network
FROM python:3.10
RUN pip install -U --extra-index-url https://packages.pathway.com/966431ef6ba pathway
COPY ./pathway-src/process-stream.py process-stream.py
CMD ["python", "-u", "process-stream.py"]
We provide the sources of the Kafka project.
Let's use the following toy dataset:
userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
2,296,4.0,1147880044
2,306,2.5,1147868817
2,307,3.0,1147868828
2,665,2.0,1147878820
2,899,4.5,1147868510
2,1088,2.0,1147868495
3,296,1.0,1147880044
3,306,2.5,1147868817
3,307,4.0,1147868828
3,665,2.0,1147878820
3,899,1.5,1147868510
3,1088,5.0,1147868495
You obtain the following results:
movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1
As expected, the top 3 get updated whenever the ranking changes due to a new rating.
Congratulations! You can now find the K best-rated movies on your VOD platform; However, your team has discovered a new alternative to Kafka: Redpanda. It is totally Kafka-compatible, does not rely on ZooKeeper, and much easier to manage, is more durable (meaning no data loss, as there is no page cache) and according to their published benchmarks is faster than Kafka! Suppose your team is excited about this and proposes your next task: switch from Kafka to Redpanda.
With Redpanda, the project is simpler, all you have is:
Let's see how to deploy Redpanda in docker and how it impacts your project.
(You can skip this section if you already have an existing Redpanda instance)
First, remove the two services kafka
and zookeeper
, and replace them with a redpanda
service:
services: redpanda: command: - redpanda - start - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092 - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082 - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082 - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081 - --rpc-addr redpanda:33145 - --advertise-rpc-addr redpanda:33145 - --smp 1 - --memory 1G - --mode dev-container - --default-log-level=debug - --set redpanda.enable_transactions=true - --set redpanda.enable_idempotence=true - --set redpanda.auto_create_topics_enabled=true image: docker.redpanda.com/redpandadata/redpanda:v23.1.2 container_name: redpanda volumes: - redpanda:/var/lib/redpanda/data networks: - tutorial_network
You must now connect Pathway and the stream producer to redpanda
instead of kafka
.
This could have been avoided by naming the Kafka container differently or by naming the Redpanda container kafka
.
As previously mentioned, we need to update the server's address in the settings:
producer = KafkaProducer( bootstrap_servers=["redpanda:9092"], security_protocol="PLAINTEXT", api_version=(0, 10, 2),)
This is it! The setting will work exactly the same as with Kafka; However, for consistency, there are also dedicated Redpanda connectors. There is no difference between Kafka and Redpanda connectors, as the same connector is used under the hood.
With the Redpanda connector, here is what your ./pathway-src/process-stream.py
file looks like:
import pathway as pwimport timerdkafka_settings = { "bootstrap.servers": "redpanda:9092", "security.protocol": "plaintext", "group.id": "0", "session.timeout.ms": "6000",}t_ratings = pw.io.redpanda.read( rdkafka_settings, topic_names=["ratings"], format="json", value_columns=[ "movieId", "rating", ], types={"movieId": pw.Type.INT, "rating": pw.Type.FLOAT}, autocommit_duration_ms=100,)t_best_ratings = compute_best(t_ratings, 3)# We output the results in a dedicated CSV filepw.io.csv.write(t_best_ratings, "./best_ratings.csv")# We wait for Kafka and ZooKeeper to be readytime.sleep(20)# We launch the computationpw.run()
If you don't care about the names of the connector and server, you don't have to change the file at all.
As with Pathway, you need to update the server name (if required) for the stream generator:
producer = KafkaProducer( bootstrap_servers=["redpanda:9092"], security_protocol="PLAINTEXT", api_version=(0, 10, 2),)
Small problem, if you look at the results now, we have an empty best_ratings.csv
.
This comes from the creation of the ratings
topic.
While the topic was already ready with Kafka, it was created at the reception of the first message with Redpanda.
Creating a topic makes Redpanda discard the messages until it is ready.
Sending a message at the beginning of the computation should solve this:
producer.send(topic, "*COMMIT*".encode("utf-8"))time.sleep(2)
Note that this difference comes from Redpanda and not Pathway. Pathway connects to Redpanda and Kafka totally transparently. Pathway will receive and process the data the same way whether Kafka or Redpanda is used. Kafka and Redpanda are responsible for handling messages: Redpanda discards the incoming messages while the topic is created.
Despite the fix, the final file is very similar to the one for the Kafka version:
from kafka import KafkaProducerimport csvimport timeimport jsontopic = "ratings"#We wait for Kafka and ZooKeeper to be readytime.sleep(30)producer = KafkaProducer( bootstrap_servers=["kafka:9092"], security_protocol="PLAINTEXT", api_version=(0, 10, 2),)producer.send(topic, "*COMMIT*".encode("utf-8"))time.sleep(2)with open("./dataset.csv", newline="") as csvfile: dataset_reader = csv.reader(csvfile, delimiter=",") first_line = True for row in dataset_reader: # We skip the header if first_line: first_line = False continue message_json = { "userId": int(row[0]), "movieId": int(row[1]), "rating": float(row[2]), "timestamp": int(row[3]), } producer.send(topic, (json.dumps(message_json)).encode("utf-8")) time.sleep(0.1)producer.send(topic, "*COMMIT*".encode("utf-8"))time.sleep(2)producer.close()
You can also take a look at the sources of the Redpanda project.
With this, the results are the same as with Kafka:
movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1
You've successfully computed the K best-rated movies using Redpanda, and your ranking is automatically updated thanks to Pathway; However, your team still isn't satisfied with the outcome. After taking a closer look, you realize the issue: you've been sending your results in a CSV file, which isn't the most suitable for handling a data stream. Not only that, but the file stays on your local computer, preventing others in the organization from accessing the data in real time.
Your team suggests sending the results back to Redpanda into a best_ratings
topic instead.
Redpanda is optimized for handling data streams, making it a more efficient and effective solution than sending data in a CSV file.
By doing this, you can ensure the data is accessible to everyone who needs it in real time.
Connecting to Redpanda with Pathway is as easy as connecting to Kafka. You need to use a Redpanda connector, which is exactly the same as the Kafka connector:
rdkafka_settings = { "bootstrap.servers": "redpanda:9092", "security.protocol": "plaintext", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000",}pw.io.redpanda.write( t_best_ratings, rdkafka_settings, topic_name="best_ratings", format="json")
As previously mentioned, you can also establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism as (e.g. connecting to Redpanda Cloud) follows:
rdkafka_settings = { "bootstrap.servers": "redpanda:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}
Congratulations! π You have built a K-best-rated application in Pathway and made it work with Redpanda. Being fully Kafka API-compatible, Redpanda just works: if the server name remains the same, you have nothing to do with your Pathway code! For consistency, you can use Pathway's Redpanda connectors which work exactly the same as Kafka connectors.