Using database connectors

Connect Pathway on top of your PostgreSQL/Debezium database using pw.io.debezium.read and pw.io.postgres.write.

In this tutorial, you will learn how to set up an architecture using Pathway on top of PostgreSQL/Debezium. The aim is to show you how to use Pathway to perform real time processing of the changes of your PostgreSQL database. Traditional databases, such as PostgreSQL, are not designed for streaming scenarios: we need a change data capture (CDC) mechanism to monitor a database and stream the changes. To create our input stream from PostgreSQL, we will use Debezium and its associated Pathway's connector pw.io.debezium.read. The output stream will be sent back to the PostgreSQL database using Pathway's PostgreSQL connector pw.io.postgres.write.

This tutorial is a bit longer than the other ones, as you will need to deploy several containers using docker-compose to have a working example. If you have never used docker-compose, don't be afraid! This tutorial does not require any knowledge about docker-compose: we will provide all the required settings in order to make it as pleasant as possible 😉, you will see that it makes the deployment of the database quite easy.

If you have your own PostgreSQL and Debezium instances already deployed, you may skip the architecture part to focus on the input and output connectors.

Short version

Let's consider a simple scenario: you have a SQL table values with only one column value which has new entries, and you want to compute the sum of the values to be stored in another table sum_table. The changes in the table values are captured and streamed by Debezium.

You can do it as follows in Pathway:

sum.py
import pathway as pw

# Debezium settings
input_rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
}
# PostgreSQL settings
output_postgres_settings = {
    "host": "postgres",
    "port": "5432",
    "dbname": "values_db",
    "user": "user",
    "password": "password",
}

# We define a schema for the table
# It set all the columns and their types
class InputSchema(pw.Schema):
  value: int


# We use the Debezium connector to listen to the "postgres.public.values" topic
t = pw.io.debezium.read(
    input_rdkafka_settings,
    topic_name="postgres.public.values",
    schema=InputSchema,
)

# # We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# We use the PostgreSQL connector to send the resulting output stream containing the sum
pw.io.postgres.write(t, output_postgres_settings, "sum_table")

# We launch the computation.
pw.run()

Architecture

First, let's take a moment to study the architecture of our project.

You need a PostgreSQL database with two tables: the tables values on which regular updates will be performed, creating the input data stream, and the sum_table output in which the sum will be stored and periodically updated by Pathway.

Then, you need to set up our CDC to capture the changes in the PostgreSQL database and create a stream of it. This part is done by a Debezium instance.

The updates from Debezium are sent to Kafka, which relies on ZooKeeper, which then propagates them in Pathway.

Finally, Pathway receives the updates from Kafka, processes the data, and sends the sum back to PostgreSQL.

Architecture

Docker containers

To summarize, our puzzle has five pieces:

  • PostgreSQL
  • Debezium
  • Kafka
  • ZooKeeper
  • Pathway

No need to say that installing those from scratch can be a hurdle...

Fortunately, we can use Docker containers and Docker Compose to ease the deployment of our project. In a nutshell, a container is a virtual environment in which we can run standalone applications. As an example, the PostgreSQL container you will use in this project contains a minimal distribution to run a PostgreSQL database: it is lightweight and PostgreSQL is already installed, but it may not work for other purposes. In our case, the main advantages of using docker containers is the simplicity of the installation and the setting. If you are interested in Docker containers, or if you want to adapt this example to your project, checkout the doc.

You will use a docker-compose.yml in the root of this project to set it up. Such a file is organized as follows:

version: "3.7"
services:
  postgres:
    build:
    environment:
    volumes:
  kafka:
    build:
    ...

Each application, called a service here, is defined there with all its relevant parameters, for example parameters in build define how to build the container (which image and so on). Note that postgres here is the name given to the service, whether a PostgreSQL image is used is determined by the build parameter.

Postgres

You first need to configure our database:

  postgres:
    container_name: db_tuto_postgres
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=values_db
      - PGPASSWORD=password
    volumes:
      - ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
      - ./sql/update_db.sh:/update_db.sh

Most of the variables are self-explanatory. The only subtlety here is how the database is created and updated. The container will use the script in /docker-entrypoint-initdb.d/ to initialize the database. Docker-compose copies files into the container using volumes. We create a file init-db.sql in which we create our two tables values and sum_table:

./sql/init-db.sql
CREATE TABLE IF NOT EXISTS values (
    value integer NOT NULL
);

CREATE TABLE IF NOT EXISTS sum_table (
    sum REAL NOT NULL,
    time BIGINT NOT NULL,
    diff INTEGER NOT NULL
);

You also need a script update_db.sh to insert data into the table, to create the stream:

./sql/update_db.sh
#!/bin/bash
export PGPASSWORD='password'

for LOOP_ID in {1..1000}
do
    psql -d values_db -U user -c "INSERT INTO values VALUES ($LOOP_ID);"
    sleep 0.5
done

⚠️ This file should have executable rights.

We will copy this file at the root, not at /docker-entrypoint-initdb.d/ as we want to launch it manually.

Debezium

For Debezium, we need to configure ZooKeeper and Kafka. For all of those, we are going to use very specific images which considerably limit the number of settings to do.

ZooKeeper

  zookeeper:
    container_name: db_tuto_zookeeper
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

Kafka

Kafka will be connected to ZooKeeper. Thanks to docker compose, all the containers share the same network. To connect to a given service, we just need to use its name: to connect to our ZooKeeper container we only need to write "zookeeper:2181".

  kafka:
    container_name: db_tuto_kafka
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
    - 9092:9092

Debezium

  debezium:
    container_name: db_tuto_debezium
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
    volumes:
      - ./debezium/connector.sh:/kafka/connector.sh
    depends_on: [kafka]
    ports:
      - 8083:8083

We need to connect the Debezium connector to our PostgreSQL database:

./debezium/connector.sh
#!/bin/bash

curl -H 'Content-Type: application/json' debezium:8083/connectors --data '
{
  "name": "values-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "pgoutput",
    "database.hostname": "postgres", 
    "database.port": "5432", 
    "database.user": "user", 
    "database.password": "password", 
    "database.dbname" : "values_db", 
    "database.server.name": "postgres", 
    "table.include.list": "public.values" 

  }
}'

⚠️ This file should have executable rights.

Copy this script at the root of the container to execute it manually.

Pathway

Now you need to set up the container for Pathway. Pathway does not have its own docker image, so you are going to use a Dockerfile to configure our container.

  pathway:
    container_name: db_tuto_pathway
    build:
      context: .
      dockerfile: ./pathway-src/Dockerfile
    depends_on: [kafka, postgres]

In our Dockerfile, we use a Python image. You just need to use the pip install command to install Pathway.

./pathway-src/Dockerfile
FROM --platform=linux/x86_64 python:3.10

RUN pip install -U pathway
RUN pip install kafka-python
COPY ./pathway-src/sum.py sum.py

CMD ["python", "-u", "sum.py"]

⚠️ For compatibility reasons, we use a x86_64 Linux container.

The file sum.py is the entry of our container: it will be automatically launched when the container has started. Once it is terminated, the container is stopped. Our pipeline will be defined in this file.

That's it! That's a large docker-compose file, but when you think that this is enough to launch the 5 containers and make them work together, it is quite impressive!

Makefile

To launch the containers, we only need to run docker-compose up in the same directly as docker-compose.yaml. Nevertheless, it may be more simple to have a dedicated Makefile:

./Makefile
build:
    chmod +x ./debezium/connector.sh
    chmod +x ./sql/update_db.sh
    docker-compose up -d
    sleep 5
    docker-compose exec debezium ./connector.sh
    docker-compose exec postgres ./update_db.sh

stop:
    docker-compose down -v
    docker rmi tutorial-example-pathway:latest

You can launch the experiment with make and stop it with make stop.

Now you only need to do our pipeline in sum.py.

Debezium input connector

Data stream: For the input connector, the stream should be in the form of Debezium messages received on a given topic. Each received update is atomic, and triggers the update of the pipeline created in Pathway.

Note that Debezium connectors only work in streaming mode.

⚠️ We talk about Debezium messages but it is a simplification. Debezium works with Kafka: in practice, the connector should be connected to Kafka. The main difference with the regular Kafka connector is the expected formatting of the messages.

Usage: The Debezium input connector pw.io.debezium.read takes several arguments:

  • rdkafka_settings: the settings used to connect to the Kafka instance receiving the Debezium messages; they follow the format of librdkafka,
  • topic_name: the topic which is listened to,
  • schema: the schema of the table. It defines the columns names and their types. It also defines the primary keys.
  • autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway's dataflow.

⚠️ Note that a Debezium connector listens to only one topic.

Usage:

class InputSchema(pw.Schema):
    value: int
t = pw.io.debezium.read(
    input_rdkafka_settings,
    topic_name="postgres.public.values",
    schema=InputSchema,
    autocommit_duration_ms=100
)

The optional parameter types used to cast the input values relies on specific pw.Type: types={"value": pw.Type.INT}.

PostgreSQL output connector

The output connector pw.io.postgres.write adds the updates made to a table t to a given PostgreSQL table.

Usage: the output connector takes the following arguments:

  • table: the Pathway table to send to PostgreSQL,
  • postgres_settings: the settings used to connect to the PostgreSQL database; they follow the format of librdkafka,
  • table_name: PostgreSQL table on which the messages are sent.
pw.io.postgres.write(t, output_postgres_settings, "sum_table")

Every time the table t is updated, the changes will be automatically appended in output_stream.csv.

⚠️ The table should already be created in PostgreSQL: the creation is not done by Pathway. The table must include all the columns of the table t has, as well as two extra columns: time and diff. The columns time and diff are expressing the timestamp of the update and whether the update is an insertion or a deletion (an update is simply the simultaneous suppression of the old value along with the insertion of the new one).

In our example, we must create a sum_table using this SQL command:

CREATE TABLE IF NOT EXISTS sum_table (
    sum REAL NOT NULL,
    time BIGINT NOT NULL,
    diff INTEGER NOT NULL
);

Complete example

To summarize, we have a project with the following structure:

.
├── debezium/
│   └── connector.sh
├── pathway-src/
│   ├── Dockerfile
│   └── sum.py
├── sql/
│   ├── init-db.sql
│   └── update_db.sh
├── docker-compose.yml
└── Makefile

Except the sum.py, all the files have been explained previously. The full example is accessible in our public repository.

Let's see in more details how the pipeline is done in Pathway in sum.py:

./pathway-src/sum.py
import pathway as pw

input_rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
}
output_postgres_settings = {
    "host": "postgres",
    "port": "5432",
    "dbname": "values_db",
    "user": "user",
    "password": "password",
}

class InputSchema(pw.Schema):
    value: int


t = pw.io.debezium.read(
    input_rdkafka_settings,
    topic_name="postgres.public.values",
    schema=InputSchema,
    autocommit_duration_ms=100
)

t = t.reduce(sum=pw.reducers.sum(t.value))

pw.io.postgres.write(t, output_postgres_settings, "sum_table")

pw.run()

Don't forget to run the computation with pw.run(), otherwise nothing will happen. Once pw.run() is called, the computation will be run forever until it gets killed. If you need some reminder on Pathway operations, don't hesitate to take a look at our First steps guide.

To launch the computation you just need to run make in the root of the project, it will launch all the containers, initialize the database and start adding new values. Every addition in the table values in PostgreSQL will trigger an update, through Debezium, and Pathway will send an update to the table sum_table.

To monitor the changes, you can log in the PostgreSQL container:

docker-compose exec postgres psql values_db -U user -W

After typing your password, you should be able to see all the updates to the table sum_table.

To see the latest value of the sum, we just need to type:

SELECT sum FROM sum_table ORDER BY time DESC, diff DESC LIMIT 1;

This value is updated in realtime by Pathway.

In you want more details, you can see the 10 latest updates by typing:

SELECT * FROM sum_table ORDER BY time DESC, diff DESC LIMIT 10;

To learn more about what those updates mean, don't hesitate to read the explanation provided in our first example.