pw.io.mongodb

pw.io.mongodb.write(table, *, connection_string, database, collection, max_batch_size=None)

sourceWrites table’s stream of updates to a MongoDB table.

If the specified database or table doesn’t exist, it will be created during the first write.

The entries in the resulting table will have two additional fields: time and diff. In particular, time is a processing time of a row and diff shows the nature of the change: 1 means a row was added and -1 means a row was deleted.

Note: Since MongoDB stores DateTime in milliseconds, the Duration type is also serialized as an integer number of milliseconds for consistency.

  • Parameters
    • connection_string (str) – The connection string for the MongoDB database. See the MongoDB documentation for the details.
    • database (str) – The name of the database to update.
    • collection (str) – The name of the collection to write to.
    • max_batch_size (int | None) – The maximum number of entries to insert in one batch.
  • Returns
    None

Example:

To get started, you need to run MongoDB locally. The easiest way to do this, if it isn’t already running, is by using Docker. You can set up MongoDB in Docker with the following commands.

docker pull mongo
docker run -d --name mongo -p 27017:27017 mongo

The first command pulls the latest MongoDB image from Docker. The second command runs MongoDB in the background, naming the container mongo and exposing port 27017 for external connections, such as from your Pathway program.

If the container doesn’t start, check if port 27017 is already in use. If so, you can map it to a different port.

Once MongoDB is running, you can access its shell with:

docker exec -it mongo mongosh

There’s no need to create anything in the new instance at this point.

With MongoDB running, you can proceed with a Pathway program to write data to the database. Start by importing Pathway and creating a test table.

import pathway as pw

pet_owners = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | cat
8   | Alice | cat
''')

Next, write this data to your MongoDB instance with the Pathway connector.

pw.io.mongodb.write(
    pet_owners,
    connection_string="mongodb://127.0.0.1:27017/",
    database="pathway-test",
    collection="pet-owners",
)

If you’ve changed the port, make sure to update the connection string with the correct one.

You can modify the code to change the data source or add more processing steps. Remember to run the program with pw.run() to execute it.

After the program runs, you can check that the database and collection were created. Access the MongoDB shell again and run:

show dbs

You should see the pathway-test database listed, along with some pre-existing databases:

admin         40.00 KiB
config        60.00 KiB
local         40.00 KiB
pathway-test  40.00 KiB

Switch to the pathway-test database and list its collections:

use pathway-test
show collections

You should see:

pet-owners

Finally, check the data in the pet-owners collection with:

db["pet-owners"].find().pretty()

This should return the following entries, along with additional diff and time fields:

[
    {
        _id: ObjectId('67180150d94db90697c07853'),
        age: Long('9'),
        owner: 'Bob',
        pet: 'cat',
        diff: Long('1'),
        time: Long('0')
    },
    {
        _id: ObjectId('67180150d94db90697c07854'),
        age: Long('8'),
        owner: 'Alice',
        pet: 'cat',
        diff: Long('1'),
        time: Long('0')
    },
    {
        _id: ObjectId('67180150d94db90697c07855'),
        age: Long('10'),
        owner: 'Alice',
        pet: 'dog',
        diff: Long('1'),
        time: Long('0')
    }
]

For more advanced setups, such as replica sets, authentication, or custom read/write concerns, refer to the official MongoDB documentation on connection strings