pw.io.mongodb
pw.io.mongodb.write(table, *, connection_string, database, collection, max_batch_size=None)
sourceWrites table
’s stream of updates to a MongoDB table.
If the specified database or table doesn’t exist, it will be created during the first write.
The entries in the resulting table will have two additional fields: time
and diff
. In particular, time
is a processing time of a row
and diff
shows the nature of the change: 1
means a row was added and -1
means a row was deleted.
Note: Since MongoDB stores DateTime in milliseconds, the Duration type is also serialized as an integer number of milliseconds for consistency.
- Parameters
- connection_string (
str
) – The connection string for the MongoDB database. See the MongoDB documentation for the details. - database (
str
) – The name of the database to update. - collection (
str
) – The name of the collection to write to. - max_batch_size (
int
|None
) – The maximum number of entries to insert in one batch.
- connection_string (
- Returns
None
Example:
To get started, you need to run MongoDB locally. The easiest way to do this, if it isn’t already running, is by using Docker. You can set up MongoDB in Docker with the following commands.
docker pull mongo
docker run -d --name mongo -p 27017:27017 mongo
The first command pulls the latest MongoDB image from Docker. The second command
runs MongoDB in the background, naming the container mongo
and exposing port
27017
for external connections, such as from your Pathway program.
If the container doesn’t start, check if port 27017
is already in use. If so, you
can map it to a different port.
Once MongoDB is running, you can access its shell with:
docker exec -it mongo mongosh
There’s no need to create anything in the new instance at this point.
With MongoDB running, you can proceed with a Pathway program to write data to the database. Start by importing Pathway and creating a test table.
import pathway as pw
pet_owners = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | cat
8 | Alice | cat
''')
Next, write this data to your MongoDB instance with the Pathway connector.
pw.io.mongodb.write(
pet_owners,
connection_string="mongodb://127.0.0.1:27017/",
database="pathway-test",
collection="pet-owners",
)
If you’ve changed the port, make sure to update the connection string with the correct one.
You can modify the code to change the data source or add more processing steps.
Remember to run the program with pw.run()
to execute it.
After the program runs, you can check that the database and collection were created. Access the MongoDB shell again and run:
show dbs
You should see the pathway-test
database listed, along with some pre-existing
databases:
admin 40.00 KiB
config 60.00 KiB
local 40.00 KiB
pathway-test 40.00 KiB
Switch to the pathway-test
database and list its collections:
use pathway-test
show collections
You should see:
pet-owners
Finally, check the data in the pet-owners
collection with:
db["pet-owners"].find().pretty()
This should return the following entries, along with additional diff
and time
fields:
[
{
_id: ObjectId('67180150d94db90697c07853'),
age: Long('9'),
owner: 'Bob',
pet: 'cat',
diff: Long('1'),
time: Long('0')
},
{
_id: ObjectId('67180150d94db90697c07854'),
age: Long('8'),
owner: 'Alice',
pet: 'cat',
diff: Long('1'),
time: Long('0')
},
{
_id: ObjectId('67180150d94db90697c07855'),
age: Long('10'),
owner: 'Alice',
pet: 'dog',
diff: Long('1'),
time: Long('0')
}
]
For more advanced setups, such as replica sets, authentication, or custom read/write concerns, refer to the official MongoDB documentation on connection strings