Pathway in Minutes: Quick ETL Examples
In this article, you will learn how to start your journey with Pathway and build a simple ETL pipeline.
First, you need to make sure Pathway is installed. Install Pathway with a single pip command
pip install pathway
on a Python 3.10+ installation, and you are ready to roll!
A simple sum example
Let's start with a simple sum over positive values stored in CSV files, and written to a JSON Lines file:
Join and filter ETL example
This example takes two data sources:
- live measurements from a Kafka topic
- thresholds from local CSV files.
The aim is to combine the data from those two data sources and find the live measurements that are above the threshold.
Source Code
This is how you can do the whole pipeline in Pathway:
import pathway as pw # import Pathway
# Declare the Schema of your tables using pw.Schema.
# There are two input tables: measurements and threshold.
# Both have two columns: a name (str) and a float.
class MeasurementSchema(pw.Schema):
name: str
value: float
class ThresholdSchema(pw.Schema):
name: str
threshold: float
# Define Kafka configuration to connect to your Kafka instance
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
# Accessing the measurements using the Kafka Connector
measurements_table = pw.io.kafka.read(
rdkafka_settings,
topic="topic",
schema=MeasurementSchema,
format="json",
autocommit_duration_ms=1000
)
# Accessing the threshold data stored in CSV files
thresholds_table = pw.io.csv(
'./threshold-data/',
schema=ThresholdSchema,
)
# Joining tables on the column name
joined_table = measurements_table.join( # The left table is measurements_table (pw.left)
thresholds_table, # The right table is thresholds_table (pw.right)
pw.left.name==pw.right.name, # The join is done on the column name of each table
).select( # The columns of the joined table are chosen using select
*pw.left, # All the columns of measurements are kept
pw.right.threshold # The threshold column of the threshold table is kept
)
# Filtering value strictly higher than the threshold.
alerts_table = joined_values.filter(
pw.this.value > pw.this.threshold
).select(pw.this.name, pw.this.value) # Only name and value fields are kept
# Sending the results to another Kafka topic, on the same Kafka instance
pw.io.kafka.write(alerts_table, rdkafka_settings, topic_name="alerts_topic", format="json")
# Launching Pathway computation.
pw.run()
More details
This example connects to two data sources using input connectors.
Each connector connects to a data source and the input data stream is represented in a table, measurements_table
and thresholds_table
in this case.
You can specify the schema of the data using pw.Schema
. You can use the same schema for several tables.
Now that you have data, you can process it as you want! Joins, temporal windows, filtering... You can have a glimpse of the available operations in our basic operations guide.
With pw.run()
, the computation is launched.
Each update in the input data streams will automatically trigger the update of the whole pipeline.
When new input is received in either measurements_table
or thresholds_table
, the tables joined_table
and alerts_table
are updated and the changes are forwarded to the Kafka topic using the Kafka output connector.
Pathway will poll for new updates until the process is terminated: the computation runs forever until the process gets killed.
This is the normal behavior of Pathway.
If you want to test your pipeline on static and finite data, Pathway also provides a static mode and a demo
module.
You can learn more about the concepts of Pathway in our dedicated article.
Understanding the output
Suppose that the following input data has been received on the Kafka topic:
{"name": "A", "value":8}
{"name": "B", "value":10}
And the threshold values are:
name, threshold
"A", 9
"B", 9
Then the output is:
{"name": "B", "value":10, "time":1, "diff":1}
The output contains two more fields: time
and diff
:
time
represents the time at which the update has happened. In practice, the time is a timestamp.diff
represents whether the row represents an addition or a deletion. An update is represented by two rows: one to remove the old value, one to add the new values. Those two rows have the same time to ensure the atomicity of the operation.
In this case, we assume the first values were computed at time 1.
The value diff
is equal to 1
as it is an insertion.
Suppose now that the thresholds are updated:
name, threshold
"A", 7
"B", 11
Then the output is:
{"name": "B", "value":10, "time":1, "diff":1}
{"name": "B", "value":10, "time":2, "diff":-1}
{"name": "A", "value":8, "time":2, "diff":1}
There are two more lines: the old alert is removed (diff=-1
) and a new one is inserted for the other event (diff=1
).
Old values are still kept as the output is a log of insertion and suppression.
Other examples
Looking for more? Check out our showcase catalog.