Kafka ETL: Processing event streams in Python
In this tutorial, you will learn how to write a Kafka ETL in Python using Pathway, an open-source event processing framework. You will use Pathway connectors and transformations to extract, transform, and load event streams across multiple Kafka topics.
Python's simplicity and ease of use make it a popular choice for data processing tasks. However, Kafka, one of the most widely used tools to build ETL pipelines, is implemented in Java and mainly used by Java and Scala users. Pathway, a Python stream processing framework with a Rust engine, makes building ETL pipeline over Kafka in pure Python simple without compromising on performance.
Imagine you've been hired by a fraud-detection company. The company monitors the logs from different servers and raises an alert whenever a pattern is suspicious. Your job is to manage your company's data and ensure the data received by the data science team is clean and ready to use. A new challenge emerges when the company expands its monitoring to servers in Paris. Suddenly, the times you're receiving are no longer uniform. From New York, they bear the Eastern Standard Time signature, while those from Paris are stamped with Central European Time.
Until now, the times you received were from servers in New York:
2024-02-05 10:01:52.884548 -0500
And this is what the new times from the new servers in Paris look like:
2024-02-05 16:02:34.934749 +0100
You must unify these disparate times into a single format to maintain data integrity.
Enter ETL, a three-step process used to clean and unify the data before sharing it for training a model or doing business analytics. First, you need to extract (E) the data, then transform (T) it, before finally loading (L) it. This task is crucial, especially when data comes from different data sources, to ensure the data used within the company follows a given data type and various requirements.
This article shows how to create a Kafka ETL pipeline in Python to extract data from multiple topics and transform it to load it into a combined Kafka topic. More precisely, you will learn how to do the different ETL steps with Pathway:
- (E) extracting different data streams from Kafka using Pathway Kafka input connector,
- (T) converting the times with varying time zones into timestamps using Pathway datetime module,
- (T) concatenating the resulting data streams using the Pathway concatenation function,
- (L) loading the final data stream back into Kafka.
The full Pathway script is here for those in a hurry.
ETL architecture with Kafka in, Kafka out
The logs are sent to two distinct Kafka topics, one per time zone. You want to use Pathway to do the Kafka ETL: connect to the topics, extract the data, do the time zone conversion, and concatenate the resulting data streams into one. Finally, you want to return the result to a third Kafka topic.
Docker containers
The project uses several docker containers:
- one for Kafka
- one for Zookeeper
- one for Pathway, for the ETL
- one imitating company servers, creating the data. It will be called "producer".
Kafka and Zookeeper are directly managed in the docker-compose.yml
file.
Pathway and the producer are managed using dedicated docker files.
Here is the final architecture of the project:
.
├── pathway-src/
│ ├── Dockerfile
│ ├── etl.py
│ └── read-results.py
├── producer-src/
│ ├── create-stream.py
│ └── Dockerfile
├── docker-compose.yml
└── Makefile
You can find the whole project on GitHub.
Data generation
The data is generated using a Python script. Every second, a new log is generated with the current datetime. The message is randomly associated with one of the two time zones and sent to the associated Kafka topic. To simplify the logs identification, the log also contains a field "message" with the log's number.
timezone1 = ZoneInfo("America/New_York")
timezone2 = ZoneInfo("Europe/Paris")
producer1 = KafkaProducer(
bootstrap_servers=["kafka:9092"],
security_protocol="PLAINTEXT",
api_version=(0, 10, 2),
)
producer2 = KafkaProducer(
bootstrap_servers=["kafka:9092"],
security_protocol="PLAINTEXT",
api_version=(0, 10, 2),
)
def send_message(timezone: ZoneInfo, producer: KafkaProducer, i: int):
timestamp = datetime.now(timezone)
message_json = {"date": timestamp.strftime(str_repr), "message": str(i)}
producer.send(topic1, (json.dumps(message_json)).encode("utf-8"))
for i in range(input_size):
if random.choice([True, False]):
send_message(timezone1, producer1, i)
else:
send_message(timezone2, producer2, i)
time.sleep(1)
You can find the entire file here.
ETL with Pathway: Concatenating data streams
Now that the setup is ready, Pathway can manage all the rest!
Extract from Kafka
Pathway provides connectors to connect and extract data from different data sources. You can connect to Kafka using Pathway Kafka connectors.
In Pathway, data is represented using tables and you need to define the data types of the extracted data using schemas:
class InputStreamSchema(pw.Schema):
date: str
message: str
You need one connector per topic, but the connectors can use the same settings.
rdkafka_settings = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
"auto.offset.reset": "earliest",
}
timestamps_timezone_1 = pw.io.kafka.read(
rdkafka_settings,
topic="timezone1",
format="json",
schema=InputStreamSchema,
autocommit_duration_ms=100,
)
timestamps_timezone_2 = pw.io.kafka.read(
rdkafka_settings,
topic="timezone2",
format="json",
schema=InputStreamSchema,
autocommit_duration_ms=100,
)
You can learn more about Pathway Kafka connectors in our dedicated tutorial.
Transform into a single data stream
Now that you have the logs, you need to do the conversion. Dealing with time can be very frustrating. Fortunately, Pathway provides all the datetime functions to make it easy.
Here is how to concatenate the datetime from different time zones into a single data stream with timestamps with Pathway:
def convert_to_timestamp(table):
table = table.select(
date=pw.this.date.dt.strptime(fmt=str_repr, contains_timezone=True),
message=pw.this.message,
)
table_timestamp = table.select(
timestamp=pw.this.date.dt.timestamp(unit="ms"),
message=pw.this.message,
)
return table_timestamp
timestamps_timezone_1 = convert_to_timestamp(timestamps_timezone_1)
timestamps_timezone_2 = convert_to_timestamp(timestamps_timezone_2)
timestamps_unified = timestamps_timezone_1.concat_reindex(timestamps_timezone_2)
First, since both values (date
and message
) are strings, you need to convert the date
into a datetime
using the strptime
function of the datetime module.
The time zone is automatically determined in this step.
Then, you can convert it to a timestamp (independent of time zone) using the timestamp
function.
Finally, you can concatenate the two tables using the concat_reindex
function.
This is a very simple example of what Pathway can do. Pathway supports more complex operations such as stateful (groupby, windows, etc.) and temporal (e.g., ASOF join) operations.
Load to Kafka
Now that you have successfully transformed the data, you still need to send it back to Kafka. Using a Kafka output connector should do the trick:
pw.io.kafka.write(
timestamps_unified, rdkafka_settings, topic_name="unified_timestamps", format="json"
)
The settings are the same as the input connectors since the data is sent to the same Kafka instance.
Run it
Congratulations, your Kafka ETL pipeline is ready!
Until now, you were building the pipeline: defining the connectors and the different operators.
No data is actually loaded into the system so if you were to run your Pathway code now, the pipeline would be built, but there would be no computation.
To run the pipeline by ingesting data, you need to use the Pathway run
function:
pw.run()
Now the input connectors will connect and load the data! Thanks to its powerful Rust engine, Pathway computations are extremely fast. You are not bound by the usual limits of Python, Pathway natively supports multithreading, multiprocessing and distributed computing.
If you are curious about how the pipeline works, don't hesitate to read our article about Pathway core concepts.
Read the output
The unified logs are now available on Kafka's unified_timestamps
topic.
You can access this topic using your favorite tool.
Nonetheless, you can easily use Pathway to check everything works well.
Create a file read-results.py
in pathway-src/
to access the data:
table = pw.io.kafka.read(
rdkafka_settings,
topic=topic_name,
schema=InputStreamSchema,
format="json",
autocommit_duration_ms=100,
)
pw.io.csv.write(table, "./results.csv")
pw.run()
The entire script is available here. This script reads the data and outputs it as a CSV file:
timestamp,message,time,diff
1707217879632.242,"11",1707217879944,1
1707217876629.236,"8",1707217879944,1
1707217872469.24,"4",1707217879944,1
1707217868355.006,"0",1707217879944,1
1707217870466.797,"2",1707217879944,1
1707217873626.241,"5",1707217879944,1
1707217869465.5308,"1",1707217879944,1
1707217871468.065,"3",1707217879944,1
1707217874627.24,"6",1707217879944,1
1707217877630.239,"9",1707217879944,1
1707217875628.488,"7",1707217879944,1
1707217878631.242,"10",1707217879944,1
1707217880633.24,"12",1707217880644,1
1707217881634.5,"13",1707217881644,1
1707217882635.752,"14",1707217882644,1
The times have been uniformized and are now timestamps. 🎉
You can learn more about the output here.
Full solution
The entire project is publicly available on GitHub.
Here is the complete etl.py
file:
import time
import pathway as pw
rdkafka_settings = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
"auto.offset.reset": "earliest",
}
str_repr = "%Y-%m-%d %H:%M:%S.%f %z"
class InputStreamSchema(pw.Schema):
date: str
message: str
timestamps_timezone_1 = pw.io.kafka.read(
rdkafka_settings,
topic="timezone1",
format="json",
schema=InputStreamSchema,
autocommit_duration_ms=100,
)
timestamps_timezone_2 = pw.io.kafka.read(
rdkafka_settings,
topic="timezone2",
format="json",
schema=InputStreamSchema,
autocommit_duration_ms=100,
)
def convert_to_timestamp(table):
table = table.select(
date=pw.this.date.dt.strptime(fmt=str_repr, contains_timezone=True),
message=pw.this.message,
)
table_timestamp = table.select(
timestamp=pw.this.date.dt.timestamp(unit="ms"),
message=pw.this.message,
)
return table_timestamp
timestamps_timezone_1 = convert_to_timestamp(timestamps_timezone_1)
timestamps_timezone_2 = convert_to_timestamp(timestamps_timezone_2)
timestamps_unified = timestamps_timezone_1.concat_reindex(timestamps_timezone_2)
pw.io.kafka.write(
timestamps_unified, rdkafka_settings, topic_name="unified_timestamps", format="json"
)
pw.run()
Going further with Pathway
Congratulations! You are now ready to do Kafka ETL with Pathway.
Your setup probably differs slightly, and your ETL pipeline may require different operators. Pathway offers many connectors for extracting and loading the data from and to various sources. In addition to standard table operations, Pathway also supports temporal operations such as ASOF joins and interval joins.
Don't hesitate to take a look at Pathway documentation and reach out to us on Discord if you don't find the operator you are looking for.