Using database connectors
Connect Pathway on top of your PostgreSQL/Debezium database using pw.io.debezium.read
and pw.io.postgres.write
.
If you're looking to set up CDC (Change Data Capture) with MongoDB instead, you can check out our MongoDB tutorial. Both tutorials follow a similar structure but emphasize the unique configuration steps required for each database.
In this tutorial, you will learn how to set up an architecture using Pathway on top of PostgreSQL/Debezium.
The aim is to show you how to use Pathway to perform real time processing of the changes of your PostgreSQL database.
Traditional databases, such as PostgreSQL, are not designed for streaming scenarios: we need a change data capture (CDC) mechanism to monitor a database and stream the changes.
To create our input stream from PostgreSQL, we will use Debezium and its associated Pathway's connector pw.io.debezium.read
.
The output stream will be sent back to the PostgreSQL database using Pathway's PostgreSQL connector pw.io.postgres.write
.
This tutorial is a bit longer than the other ones, as you will need to deploy several containers using docker-compose
to have a working example.
If you have never used docker-compose
, don't be afraid!
This tutorial does not require any knowledge about docker-compose
: we will provide all the required settings in order to make it as pleasant as possible 😉, you will see that it makes the deployment of the database quite easy.
If you have your own PostgreSQL and Debezium instances already deployed, you may skip the architecture part to focus on the input and output connectors.
Short version
Let's consider a simple scenario: you have a SQL table values
with only one column value
which has new entries, and you want to compute the sum of the values to be stored in another table sum_table
. The changes in the table values
are captured and streamed by Debezium.
You can do it as follows in Pathway:
import pathway as pw
# Debezium settings
input_rdkafka_settings = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
}
# PostgreSQL settings
output_postgres_settings = {
"host": "postgres",
"port": "5432",
"dbname": "values_db",
"user": "user",
"password": "password",
}
# We define a schema for the table
# It set all the columns and their types
class InputSchema(pw.Schema):
value: int
# We use the Debezium connector to listen to the "postgres.public.values" topic
t = pw.io.debezium.read(
input_rdkafka_settings,
topic_name="postgres.public.values",
schema=InputSchema,
)
# # We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))
# We use the PostgreSQL connector to send the resulting output stream containing the sum
pw.io.postgres.write(t, output_postgres_settings, "sum_table")
# We launch the computation.
pw.run()
Architecture
First, let's take a moment to study the architecture of our project.
You need a PostgreSQL database with two tables: the tables values
on which regular updates will be performed, creating the input data stream, and the sum_table
output in which the sum will be stored and periodically updated by Pathway.
Then, you need to set up our CDC to capture the changes in the PostgreSQL database and create a stream of it. This part is done by a Debezium instance.
The updates from Debezium are sent to Kafka, which relies on ZooKeeper, which then propagates them in Pathway.
Finally, Pathway receives the updates from Kafka, processes the data, and sends the sum back to PostgreSQL.
Docker containers
To summarize, our puzzle has five pieces:
- PostgreSQL
- Debezium
- Kafka
- ZooKeeper
- Pathway
No need to say that installing those from scratch can be a hurdle...
Fortunately, we can use Docker containers and Docker Compose to ease the deployment of our project. In a nutshell, a container is a virtual environment in which we can run standalone applications. As an example, the PostgreSQL container you will use in this project contains a minimal distribution to run a PostgreSQL database: it is lightweight and PostgreSQL is already installed, but it may not work for other purposes. In our case, the main advantages of using docker containers is the simplicity of the installation and the setting. If you are interested in Docker containers, or if you want to adapt this example to your project, checkout the doc.
You will use a docker-compose.yml
in the root of this project to set it up.
Such a file is organized as follows:
version: "3.7"
services:
postgres:
build:
environment:
volumes:
kafka:
build:
...
Each application, called a service here, is defined there with all its relevant parameters, for example parameters in build
define how to build the container (which image and so on). Note that postgres
here is the name given to the service, whether a PostgreSQL image is used is determined by the build
parameter.
Postgres
You first need to configure our database:
postgres:
container_name: db_tuto_postgres
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=values_db
- PGPASSWORD=password
volumes:
- ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
- ./sql/update_db.sh:/update_db.sh
Most of the variables are self-explanatory.
The only subtlety here is how the database is created and updated.
The container will use the script in /docker-entrypoint-initdb.d/
to initialize the database.
Docker-compose copies files into the container using volumes.
We create a file init-db.sql
in which we create our two tables values
and sum_table
:
CREATE TABLE IF NOT EXISTS values (
value integer NOT NULL
);
CREATE TABLE IF NOT EXISTS sum_table (
sum REAL NOT NULL,
time BIGINT NOT NULL,
diff INTEGER NOT NULL
);
You also need a script update_db.sh
to insert data into the table, to create the stream:
#!/bin/bash
export PGPASSWORD='password'
for LOOP_ID in {1..1000}
do
psql -d values_db -U user -c "INSERT INTO values VALUES ($LOOP_ID);"
sleep 0.5
done
⚠️ This file should have executable rights.
We will copy this file at the root, not at /docker-entrypoint-initdb.d/
as we want to launch it manually.
Debezium
For Debezium, we need to configure ZooKeeper and Kafka. For all of those, we are going to use very specific images which considerably limit the number of settings to do.
ZooKeeper
zookeeper:
container_name: db_tuto_zookeeper
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
Kafka
Kafka will be connected to ZooKeeper.
Thanks to docker compose, all the containers share the same network.
To connect to a given service, we just need to use its name:
to connect to our ZooKeeper container we only need to write "zookeeper:2181"
.
kafka:
container_name: db_tuto_kafka
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
Debezium
debezium:
container_name: db_tuto_debezium
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
volumes:
- ./debezium/connector.sh:/kafka/connector.sh
depends_on: [kafka]
ports:
- 8083:8083
We need to connect the Debezium connector to our PostgreSQL database:
#!/bin/bash
curl -H 'Content-Type: application/json' debezium:8083/connectors --data '
{
"name": "values-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname" : "values_db",
"database.server.name": "postgres",
"table.include.list": "public.values"
}
}'
⚠️ This file should have executable rights.
Copy this script at the root of the container to execute it manually.
Pathway
Now you need to set up the container for Pathway. Pathway does not have its own docker image, so you are going to use a Dockerfile to configure our container.
pathway:
container_name: db_tuto_pathway
build:
context: .
dockerfile: ./pathway-src/Dockerfile
depends_on: [kafka, postgres]
In our Dockerfile, we use a Python image. You just need to use the pip install command to install Pathway.
FROM --platform=linux/x86_64 python:3.10
RUN pip install -U pathway
RUN pip install kafka-python
COPY ./pathway-src/sum.py sum.py
CMD ["python", "-u", "sum.py"]
⚠️ For compatibility reasons, we use a x86_64 Linux container.
The file sum.py
is the entry of our container: it will be automatically launched when the container has started.
Once it is terminated, the container is stopped.
Our pipeline will be defined in this file.
That's it! That's a large docker-compose file, but when you think that this is enough to launch the 5 containers and make them work together, it is quite impressive!
Makefile
To launch the containers, we only need to run docker-compose up
in the same directly as docker-compose.yaml
.
Nevertheless, it may be more simple to have a dedicated Makefile:
build:
chmod +x ./debezium/connector.sh
chmod +x ./sql/update_db.sh
docker-compose up -d
sleep 5
docker-compose exec debezium ./connector.sh
docker-compose exec postgres ./update_db.sh
stop:
docker-compose down -v
docker rmi tutorial-example-pathway:latest
You can launch the experiment with make
and stop it with make stop
.
Now you only need to do our pipeline in sum.py
.
Debezium input connector
Data stream: For the input connector, the stream should be in the form of Debezium messages received on a given topic. Each received update is atomic, and triggers the update of the pipeline created in Pathway.
Note that Debezium connectors only work in streaming mode.
⚠️ We talk about Debezium messages but it is a simplification. Debezium works with Kafka: in practice, the connector should be connected to Kafka. The main difference with the regular Kafka connector is the expected formatting of the messages.
Usage:
The Debezium input connector pw.io.debezium.read
takes several arguments:
rdkafka_settings
: the settings used to connect to the Kafka instance receiving the Debezium messages; they follow the format of librdkafka,topic_name
: the topic which is listened to,schema
: the schema of the table. It defines the columns names and their types. It also defines the primary keys.autocommit_duration_ms
: the maximum time between two commits. Everyautocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway's dataflow.
⚠️ Note that a Debezium connector listens to only one topic.
Usage:
class InputSchema(pw.Schema):
value: int
t = pw.io.debezium.read(
input_rdkafka_settings,
topic_name="postgres.public.values",
schema=InputSchema,
autocommit_duration_ms=100
)
The optional parameter types
used to cast the input values relies on specific pw.Type
: types={"value": pw.Type.INT}
.
PostgreSQL output connector
The output connector pw.io.postgres.write
adds the updates made to a table t
to a given PostgreSQL table.
Usage: the output connector takes the following arguments:
table
: the Pathway table to send to PostgreSQL,postgres_settings
: the settings used to connect to the PostgreSQL database; they follow the format of librdkafka,table_name
: PostgreSQL table on which the messages are sent.
pw.io.postgres.write(t, output_postgres_settings, "sum_table")
Every time the table t
is updated, the changes will be automatically appended in output_stream.csv
.
⚠️ The table should already be created in PostgreSQL: the creation is not done by Pathway.
The table must include all the columns of the table t
has, as well as two extra columns: time
and diff
.
The columns time
and diff
are expressing the timestamp of the update and whether the update is an insertion or a deletion (an update is simply the simultaneous suppression of the old value along with the insertion of the new one).
In our example, we must create a sum_table
using this SQL command:
CREATE TABLE IF NOT EXISTS sum_table (
sum REAL NOT NULL,
time BIGINT NOT NULL,
diff INTEGER NOT NULL
);
Complete example
To summarize, we have a project with the following structure:
.
├── debezium/
│ └── connector.sh
├── pathway-src/
│ ├── Dockerfile
│ └── sum.py
├── sql/
│ ├── init-db.sql
│ └── update_db.sh
├── docker-compose.yml
└── Makefile
Except the sum.py
, all the files have been explained previously.
The full example is accessible in our public repository.
Let's see in more details how the pipeline is done in Pathway in sum.py
:
import pathway as pw
input_rdkafka_settings = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
}
output_postgres_settings = {
"host": "postgres",
"port": "5432",
"dbname": "values_db",
"user": "user",
"password": "password",
}
class InputSchema(pw.Schema):
value: int
t = pw.io.debezium.read(
input_rdkafka_settings,
topic_name="postgres.public.values",
schema=InputSchema,
autocommit_duration_ms=100
)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.postgres.write(t, output_postgres_settings, "sum_table")
pw.run()
Don't forget to run the computation with pw.run()
, otherwise nothing will happen.
Once pw.run()
is called, the computation will be run forever until it gets killed.
If you need some reminder on Pathway operations, don't hesitate to take a look at our First steps guide.
To launch the computation you just need to run make
in the root of the project, it will launch all the containers, initialize the database and start adding new values.
Every addition in the table values
in PostgreSQL will trigger an update, through Debezium, and Pathway will send an update to the table sum_table
.
To monitor the changes, you can log in the PostgreSQL container:
docker-compose exec postgres psql values_db -U user -W
After typing your password, you should be able to see all the updates to the table sum_table
.
To see the latest value of the sum, we just need to type:
SELECT sum FROM sum_table ORDER BY time DESC, diff DESC LIMIT 1;
This value is updated in realtime by Pathway.
In you want more details, you can see the 10 latest updates by typing:
SELECT * FROM sum_table ORDER BY time DESC, diff DESC LIMIT 10;
To learn more about what those updates mean, don't hesitate to read the explanation provided in our first example.