pw.io.dynamodb

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

All internal Pathway types, except Any, can be stored in DynamoDB. The table below explains how Pathway types are serialized into DynamoDB. You can also refer to the official documentation to learn more about the data types supported by the database.

Pathway types conversion into DynamoDB

Pathway typeDynamoDB type
boolBoolean
intNumber
floatNumber
pointerString, can be deserialized back if pw.Pointer is read by a Pathway connector
strString
bytesBinary
Naive DateTimeString, containing the datetime in ISO-8601 format
UTC DateTimeString, containing the datetime in ISO-8601 format
DurationNumber, serialized with nanosecond precision
JSONString, containing the serialized JSON value
np.ndarrayMap, with two top-level fields: a List named "shape" denoting the shape of the stored array, and a List named "elements" denoting the elements of a flattened array
tupleList with as many fields as elements in the tuple. These elements correspond to the serialized values of the tuple elements
listList
pw.PyObjectWrapperbinary, can be deserialized back if read by a Pathway connector

write(table, table_name, partition_key, *, sort_key=None, init_mode='default', name=None)

sourceWrites table into a DynamoDB table. The connection settings are retrieved from the environment.

This connector supports three modes: default mode, which performs no preparation on the target table; create_if_not_exists mode, which creates the table if it does not already exist; and replace mode, which replaces the table and clears any previously existing data. The table is created with an on-demand billing mode. Be aware that this mode may not be optimal for your use case, and the provisioned mode with capacity planning might offer better performance or cost efficiency. In such cases, we recommend creating the table yourself in AWS with the desired provisioned throughput settings.

Note that if the table already exists and you use either default or create_if_not_exists mode, the schema of the table, including the primary key and optional sort key, must match the schema of the table you are writing.

The connector performs writes using the primary key, defined as a combination of the partition key and an optional sort key. Note that, due to how DynamoDB operates, entries may overwrite existing ones if their keys coincide. When an entry is deleted from the Pathway table, the corresponding entry is also removed from the DynamoDB table maintained by the connector. In this sense, the connector behaves similarly to the snapshot mode in the Delta Lake output connector or the Postgres output connector.

  • Parameters
    • table (Table) – The table to write.
    • table_name (str) – The name of the destination table in DynamoDB.
    • partition_key (ColumnReference) – The column to use as the partition key in the destination table. Note that only scalar types, specifically Boolean, String and Number, can be used as index fields in DynamoDB. Therefore, the field you select in the Pathway table must serialize to one of these types. You can verify this using the conversion table provided in the connector documentation.
    • sort_key (ColumnReference | None) – An optional sort key for the destination table. Note that only scalar types can be used as the index fields in DynamoDB. Similarly to the partition key, you can only use fields that serialize into a scalar DynamoDB type.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – The table initialization mode, one of the three described above.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
  • Returns
    None

Example:

AWS provides an official DynamoDB Docker image that allows you to test locally. The image is available as amazon/dynamodb-local and can be run as follows:

docker pull amazon/dynamodb-local:latest
docker run -p 8000:8000 --name dynamodb-local amazon/dynamodb-local:latest

The first command pulls the DynamoDB image from the official repository. The second command starts a container and exposes port 8000, which will be used for the connection.

Since the database runs locally and the settings are retrieved from the environment, you will need to configure them accordingly. The easiest way to do this is by setting a few environment variables to point to the running Docker image:

export AWS_ENDPOINT_URL=http://localhost:8000
export AWS_REGION=us-west-2

Please note that specifying the AWS region is required; however, the exact region does not matter for the run to succeed, it simply needs to be set. The endpoint, in turn, should point to the database running in the Docker container, accessible through the exposed port.

At this point, the database is ready, and you can start writing a program. For example, you can implement a program that stores data in a table in the locally running database.

First, create a table:

import pathway as pw
table = pw.debug.table_from_markdown('''
   key | value
   1   | Hello
   2   | World
''')

Next, save it as follows:

pw.io.dynamodb.write(
    table,
    table_name="test",
    partition_key=table.key,
    init_mode="create_if_not_exists",
)

Remember to run your program by calling pw.run(). Note that if the table does not already exist, using init_mode="default" will result in a failure, as Pathway will not create the table and the write will fail due to its absence.

When finished, you can query the local DynamoDB for the table contents using the AWS command-line tool:

aws dynamodb scan --table-name test

This will display the contents of the freshly created table:

{
    "Items": [
        {
            "value": {
                "S": "World"
            },
            "key": {
                "N": "2"
            }
        },
        {
            "value": {
                "S": "Hello"
            },
            "key": {
                "N": "1"
            }
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}

Note that since the table.key field is the partition key, writing an entry with the same partition key will overwrite the existing data. For example, you can create a smaller table with a repeated key:

table = pw.debug.table_from_markdown('''
   key | value
   1   | Bonjour
''')

Then write it again in "default" mode:

pw.io.dynamodb.write(
    table,
    table_name="test",
    partition_key=table.key,
)

Then, the contents of the target table will be updated with this new entry where key equals to 1:

{
    "Items": [
        {
            "value": {
                "S": "World"
            },
            "key": {
                "N": "2"
            }
        },
        {
            "value": {
                "S": "Bonjour"
            },
            "key": {
                "N": "1"
            }
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}

Finally, you can run a program in "replace" table initialization mode, which will overwrite the existing data:

table = pw.debug.table_from_markdown('''
   key | value
   3   | Hi
''')
pw.io.dynamodb.write(
    table,
    table_name="test",
    partition_key=table.key,
    init_mode="replace",
)

The next run of aws dynamodb scan --table-name test will then return a single-row table:

{
    "Items": [
        {
            "value": {
                "S": "Hi"
            },
            "key": {
                "N": "3"
            }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}