Using Pathway Debezium Connector for MongoDB

Connect Pathway on top of your MongoDB/Debezium database using pw.io.debezium.read and pw.io.mongodb.write.

This tutorial will guide you through setting up an architecture using Pathway with MongoDB and Debezium to process real-time changes in your MongoDB database. If you're looking to set up CDC (Change Data Capture) with a PostgreSQL database instead, you can check out our PostgreSQL tutorial. Both tutorials follow a similar structure but emphasize the unique configuration steps required for each database.

Traditional databases like MongoDB aren't built for streaming scenarios, so you need a change data capture (CDC) mechanism to monitor the database and stream any changes. To create an input stream from MongoDB, this tutorial uses Debezium along with Pathway's connector pw.io.debezium.read. The output stream will then be sent back to the MongoDB database using Pathway's native MongoDB connector pw.io.mongodb.write.

This tutorial is a bit longer than others, as it requires deploying several containers using docker-compose to create a working example. But don't worry if you've never used docker-compose before! No prior experience is needed; the tutorial provides all the necessary settings to make it straightforward and enjoyable 😉. You'll find that docker-compose makes deploying the database surprisingly easy.

If you have your own MongoDB and Debezium instances already deployed, you may skip the architecture part to focus on the input and output connectors.

Short version

Imagine a simple scenario: you have a MongoDB collection called values with documents that contain a field named value. New entries are added over time, and you want to calculate the sum of these values in real-time. The result should be stored in another collection called sum_values. Debezium captures and streams any changes in the values collection to make this real-time processing possible.

You can do it as follows in Pathway:

sum.py
import pathway as pw

# To use advanced features with Pathway Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Community, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")

# Kafka settings
input_rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
    "auto.offset.reset": "earliest",
}


class InputSchema(pw.Schema):
    value: int


if __name__ == "__main__":
    # The Kafka connector is used to listen to the "my_mongo_db.test_database.values" topic.
    t = pw.io.debezium.read(
        input_rdkafka_settings,
        topic_name="my_mongo_db.test_database.values",
        schema=InputSchema,
        autocommit_duration_ms=100,
    )

    # The sum is computed (this part is independent of the connectors).
    t = t.reduce(sum=pw.reducers.sum(t.value))

    # The MongoDB and regular filesystem connectors are used to send the
    # resulting output stream containing the sum to the database and to
    # the filesystem.
    pw.io.mongodb.write(
        t,
        connection_string="mongodb://mongodb:27017/?replicaSet=rs0",
        database="my_mongo_db",
        collection="sum_values",
    )
    pw.io.csv.write(t, "output_stream.csv")

    # The computation is launched.
    pw.run()

Architecture

First, let's take a moment to understand the architecture of our project.

You'll need a MongoDB database with two collections:

  • values: where regular updates will create an input data stream.
  • sum_values: where Pathway will store and periodically update the running sum. There's no need to manually create this output collection, as the output connector will create it automatically if it doesn't exist.

Next, you'll set up change data capture (CDC) to track changes in the MongoDB database and generate a data stream. This is handled by a Debezium instance.

Debezium sends the updates to Kafka, which coordinates with ZooKeeper and propagates the data to Pathway.

Finally, Pathway receives these updates from Kafka, processes the data, and sends the sum back to MongoDB.

Docker containers

To summarize, our puzzle has five pieces:

  • MongoDB
  • Debezium
  • Kafka
  • ZooKeeper
  • Pathway

No need to say that configuring those from scratch can be a hurdle...

Luckily, you can simplify the deployment of the project by using Docker containers and Docker Compose. In brief, a container is a lightweight virtual environment for running standalone applications. For instance, the MongoDB container you'll use in this project includes just enough to run a MongoDB database: it's lightweight, with MongoDB pre-installed, though it's not suited for other tasks. The main benefits of using Docker containers here are ease of installation and straightforward setup. If you're interested in learning more about Docker containers or adapting this example to your own project, check out the Docker documentation.

You will use a docker-compose.yml in the root of this project to set it up. Such a file is organized as follows:

version: "3.7"
services:
  mongodb:
    build:
    environment:
    volumes:
  kafka:
    build:
    ...

Each application, called a service here, is defined there with all its relevant parameters, for example parameters in build define how to build the container (which image and so on). Note that mongodb here is the name given to the service, whether a MongoDB image is used is determined by the build parameter.

MongoDB

You first need to configure our database:

  mongodb:
    container_name: mongodb
    image: mongo
    command: ["--replSet", "rs0", "--port", "27017"]
    healthcheck:
      test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'mongodb:27017'}]}) }" | mongosh --port 27017 --quiet
      interval: 5s
      timeout: 120s

The configuration may seem complex at first, so let's go through it step-by-step.

  • The container_name field defines the name of the container - it's straightforward.
  • The image field points to the official MongoDB Docker image, available on Docker Hub.

The command parameter requires a closer look. For MongoDB to support replication with Debezium, a replica set needs to be configured. A replica set is a group of MongoDB instances with the same data set, providing redundancy and higher availability. When you start MongoDB with --replSet rs0, you're enabling replica set functionality and specifying the replica set name (rs0). The --port 27017 parameter simply tells MongoDB to run on port 27017.

Once MongoDB starts, replication isn't fully set up yet. To complete the setup, you need to initialize the replica set using the rs.initiate command, which sets up the replication parameters. The healthcheck block then attempts this command every 5 seconds, using parameters for a single replica, until the database is ready to accept it and initiate replication.

Once successful, your replicated MongoDB database is ready to use.

Streaming Input Data to the MongoDB Instance

Once the database is up and running, you'll need to start adding data to it. In this tutorial, you'll use a Python script, also referred here, as input data streamer to do this. The script is straightforward: it runs a loop from 1 to 1000, emitting a number into the input collection every 500 milliseconds.

import sys
import time

from pymongo import MongoClient

if __name__ == "__main__":
    client = MongoClient("mongodb://mongodb:27017/?replicaSet=rs0")
    db = client["test_database"]
    collection = db["values"]
    for i in range(1, 1001):
        collection.insert_one({"value": i})
        print("Insert:", i, file=sys.stderr)
        time.sleep(0.5)

This section of the Docker Compose configuration only takes up a few lines:

  streamer:
    container_name: mongodb-streamer
    build:
      context: .
      dockerfile: ./data-streaming/Dockerfile
    depends_on: [mongodb]

The only important line here is the one that specifies depends_on. This dependency on the mongodb container ensures that the input data streamer waits until the MongoDB database is fully initialized, preventing it from attempting to stream data to an uninitialized database.

Debezium

To set up Debezium, you need to configure ZooKeeper and Kafka. You can use specific images for both, which significantly simplifies the configuration process and reduces the number of settings required.

ZooKeeper

  zookeeper:
    container_name: db_tuto_zookeeper
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

Kafka

Kafka will be connected to ZooKeeper. With Docker Compose, all the containers share the same network, so you can easily connect to a service using its name. For example, to connect to our ZooKeeper container, you can simply use "zookeeper:2181".

  kafka:
    container_name: db_tuto_kafka
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991

Debezium

  debezium:
    container_name: db_tuto_debezium
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
    volumes:
      - ./debezium/connector.sh:/kafka/connector.sh
    depends_on: [kafka]
    ports:
      - 8083:8083

You need to connect the Debezium connector to our MongoDB database:

./debezium/connector.sh
#!/bin/bash

while true; do
  http_code=$(curl -o /dev/null -w "%{http_code}" -H 'Content-Type: application/json' debezium:8083/connectors --data '{
    "name": "values-connector",
    "config": {
      "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
      "mongodb.hosts": "rs0/mongodb:27017",
      "mongodb.name": "my_mongo_db",
      "database.include.list": "test_database",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "dbhistory.mongo"
    }
  }')
  if [ "$http_code" -eq 201 ]; then
    echo "Debezium connector has been created successfully"
    break
  else
    echo "Retrying Debezium connection creation in 1 second..."
    sleep 1
  fi
done

As you can see, the container is configured using an HTTP request. The body of this request must include essential MongoDB configuration parameters, along with any optional settings you wish to specify. You can find the complete list of configuration parameters for MongoDB in the Debezium MongoDB connector documentation.

⚠️ This file should have executable rights.

Copy this script at the root of the container to execute it manually.

Pathway

Next, set up the container for Pathway. This tutorial uses a straightforward Dockerfile for configuration. With a Dockerfile, the container section can be set up as follows:

  pathway:
    container_name: db_tuto_pathway
    build:
      context: .
      dockerfile: ./pathway-src/Dockerfile
    depends_on: [kafka, mongodb]

In the Dockerfile itself, you'll use a Python image. You only need to run the pip install command to install Pathway.

./pathway-src/Dockerfile
FROM --platform=linux/x86_64 python:3.10

RUN pip install -U pathway
COPY ./pathway-src/sum.py sum.py

CMD ["python", "-u", "sum.py"]

⚠️ For compatibility reasons, a x86_64 Linux container is used here.

The file sum.py serves as the entry point for our container; it will automatically run when the container starts. Once the script finishes executing, the container will stop. This file will define our data pipeline.

And that’s it! While the Docker Compose file is quite large, it’s impressive to realize that it can launch and coordinate five containers to work together seamlessly!

Makefile

To launch the containers, you only need to run docker-compose up in the same directly as docker-compose.yaml. Nevertheless, it may be more simple to have a dedicated Makefile:

./Makefile
SERVICE_NAME_PATHWAY = pathway


build:
        chmod +x ./debezium/connector.sh
        docker compose up -d
        docker compose exec debezium ./connector.sh

stop:
        docker compose down -v

connect-pathway:
        docker compose exec $(SERVICE_NAME_PATHWAY) bash

You can start the experiment by running make, and stop it with make stop. If you execute make connect-pathway, you’ll be able to see the text file created by the filesystem connector, which contains the sum of the values.

Now, all that’s left is to implement our pipeline in sum.py.

Debezium input connector

Data stream: For the input connector, the stream must consist of Debezium messages received on a specific topic. Each update is atomic and triggers an update in the pipeline created in Pathway.

Keep in mind that Debezium connectors operate only in streaming mode.

⚠️ While this tutorial refers to Debezium messages, this is a simplification. Debezium works with Kafka, so the connector should actually be connected to Kafka. The key difference between this and a regular Kafka connector is the expected formatting of the messages.

Usage: The Debezium input connector pw.io.debezium.read takes several arguments:

  • rdkafka_settings: the settings used to connect to the Kafka instance receiving the Debezium messages; they follow the format of librdkafka,
  • topic_name: the topic which is listened to,
  • schema: the schema of the table created in Pathway. It defines the columns names and their types. It also defines the primary keys.
  • autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway's dataflow.

⚠️ Note that a Debezium connector listens to only one topic.

Usage:

class InputSchema(pw.Schema):
    value: int

t = pw.io.debezium.read(
    input_rdkafka_settings,
    topic_name="my_mongo_db.test_database.values",
    schema=InputSchema,
    autocommit_duration_ms=100
)

MongoDB output connector

The output connector pw.io.mongodb.write adds updates made to a table t into a specified MongoDB collection.

Usage: The output connector requires the following arguments:

  • table: The Pathway table to send to MongoDB.
  • connection_string: The connection string for the MongoDB database. You can find more details in the MongoDB documentation.
  • database: The name of the database to update.
  • collection: The name of the collection to write to.
  • max_batch_size: The maximum number of entries to insert in a single batch.
pw.io.mongodb.write(
    t,
    connection_string="mongodb://mongodb:27017/",
    database="my_mongo_db",
    collection="sum_values",
)

Whenever the table t is updated, the changes will be automatically appended to both the sum_values collection and the text file output_stream.csv. The output collection will include all the columns from table t, along with two additional fields: time and diff. If the collection does not already exist, it will be created automatically.

The time column indicates the timestamp of the update, while the diff column specifies whether the update is an insertion or a deletion. In the case of an update, it represents the simultaneous removal of the old value and the insertion of the new one.

Complete example

To summarize, you have a project with the following structure:

.
├── data-streaming/
│   ├── streamer.py
│   └── Dockerfile
├── debezium/
│   └── connector.sh
├── pathway-src/
│   ├── Dockerfile
│   └── sum.py
├── docker-compose.yml
└── Makefile

Aside from sum.py, all the other files have been explained previously. You can find the complete example in our public repository.

Now, let's take a closer look at how the pipeline is implemented in Pathway within sum.py:

./pathway-src/sum.py
import pathway as pw

# To use advanced features with Pathway Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Community, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")

# Kafka settings
input_rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
    "auto.offset.reset": "earliest",
}


class InputSchema(pw.Schema):
    value: int


if __name__ == "__main__":
    # The Kafka connector is used to listen to the "my_mongo_db.test_database.values" topic.
    t = pw.io.debezium.read(
        input_rdkafka_settings,
        topic_name="my_mongo_db.test_database.values",
        schema=InputSchema,
        autocommit_duration_ms=100,
    )

    # The sum is computed (this part is independent of the connectors).
    t = t.reduce(sum=pw.reducers.sum(t.value))

    # The MongoDB and regular filesystem connectors are used to send the
    # resulting output stream containing the sum to the database and to
    # the filesystem.
    pw.io.mongodb.write(
        t,
        connection_string="mongodb://mongodb:27017/",
        database="my_mongo_db",
        collection="sum_values",
    )
    pw.io.csv.write(t, "output_stream.csv")

    # The computation is launched.
    pw.run()

Don’t forget to run the computation with pw.run(), or else nothing will happen. Once pw.run() is called, the computation will continue running indefinitely until it is stopped. If you need a refresher on Pathway operations, feel free to check out our First Steps Guide.

To launch the computation, simply run make in the root directory of the project. This command will start all the containers, initialize the database, and begin adding new values. Every time a new entry is added to the values collection in MongoDB, it will trigger an update through Debezium. Pathway will then send an update to the sum_values collection and reflect those changes in a CSV file named output_stream.csv, which you can find in the pathway container by using make connect-pathway.

To monitor the changes, you can log into the MongoDB container:

docker-compose exec mongodb mongosh

After connecting, you should be able to see all the updates to the sum_values collection.

To view the latest sum value, simply type:

use my_mongo_db
db["sum_values"].find().sort({ time: -1 }).pretty()

This value is updated in real-time by Pathway.

To understand what these updates mean, feel free to check out the explanation provided in our first example.