pw.io.postgres

Pathway provides both Input and Output connectors for PostgreSQL.

The Input connector reads changes by consuming diffs from the PostgreSQL Write-Ahead Log (WAL). This allows Pathway to ingest changes in a streaming fashion directly from the database replication stream.

The Output connector writes a Pathway table into PostgreSQL. It supports two operating modes:

  • Stream of changes mode which propagates updates as a stream of changes.
  • Snapshot mode which maintains an exact copy (replica) of the Pathway table in PostgreSQL.

TLS Support

TLS is supported in both the Input and Output connectors and behaves identically in each case.

TLS configuration is controlled via the sslmode and sslrootcert parameters passed inside postgres_settings for both connectors.

The sslmode parameter follows the official PostgreSQL documentation: sslmode. It defines the level of TLS verification performed during connection establishment.

If no TLS-related parameters are provided, the connector defaults to sslmode="prefer". In this mode, the system first attempts to establish a TLS-encrypted connection and falls back to an unencrypted connection if TLS negotiation fails.

Type Conversion (Output Connector)

The table below describes how Pathway types map to PostgreSQL types in the output connector. All types can be round-tripped back via the read connector when the original schema type is specified.

Pathway types conversion into Postgres

Pathway typePostgres type
boolBOOLEAN
intBIGINT. If the field type is SMALLINT or INTEGER, the connector will attempt to cast the value to the specified type.
floatDOUBLE PRECISION. If the field type is REAL, the connector will also attempt to cast the value accordingly.
pointerTEXT
strTEXT
bytesBYTEA
Naive DateTimeTIMESTAMP
UTC DateTimeTIMESTAMPTZ
DurationBIGINT, serialized and deserialized with microsecond precision
JSONJSONB. If the field type is JSON, the connector will also attempt to cast the value accordingly.
np.ndarray<element_type> ARRAY. The element type is determined by the PostgreSQL column type. Supports multi-dimensional arrays. Arrays must be rectangular.
tuple (homogeneous)<element_type> ARRAY. Supports multi-dimensional arrays. Arrays must be rectangular.
list (homogeneous)<element_type> ARRAY. Supports multi-dimensional arrays. Arrays must be rectangular.
pw.PyObjectWrapperBYTEA
Array semantics:
  • Multi-dimensional arrays are supported.
  • Arrays must be rectangular (jagged arrays are rejected).
  • NULL elements inside arrays are supported.
  • The PostgreSQL column type determines the element type.
  • Only built-in PostgreSQL scalar element types are supported.

read(postgres_settings, table_name, schema, *, mode='streaming', is_append_only=False, publication_name=None, schema_name='public', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)

sourceThis module is available when using one of the following licenses only:Pathway Scale, Pathway Enterprise.

Reads a table from a PostgreSQL database.

This connector provides a lightweight alternative to pw.io.debezium.read. It supports two modes: "static" and "streaming".

In "static" mode, the table is read once and the connector stops afterward.

In "streaming" mode, a temporary replication slot is created using the pgoutput logical decoding plugin, which is bundled with PostgreSQL and requires no additional installation. The slot is created with the export snapshot option, ensuring a consistent initial read. On startup, the connector first performs a snapshot of the table as it existed at the moment the replication slot was created, and then begins consuming the PostgreSQL write-ahead log (WAL), applying incremental changes on top of that snapshot. Because the replication slot is temporary, PostgreSQL will automatically drop it once the connection is closed (i.e., when the program terminates).

To enable replication, a publication must be created in the database beforehand:

CREATE PUBLICATION {publication_name} FOR TABLE {table_name};
  • Parameters
    • postgres_settings (dict) – Connection parameters for PostgreSQL, provided as a dictionary of key-value pairs. The connection string is assembled by joining all pairs with spaces, each formatted as key=value. Keys must be strings; values of other types are converted via Python’s str().
    • table_name (str) – Name of the PostgreSQL table to read from.
    • schema (type[Schema]) – Pathway schema describing the table’s columns and their types.
    • mode (Literal['streaming', 'static']) – Polling mode for the connector. Accepted values are "streaming" (default) and "static". In "streaming" mode, the connector tracks changes in the table via the WAL, reflecting insertions, updates, deletions, and truncations in real time; requires publication_name to be specified. In "static" mode, the connector reads all currently available rows in a single commit and then stops.
    • is_append_only (bool) – Used in streaming mode. Specifies whether the input table is append-only. If the table is not append-only, it must have a primary key, and all primary key columns must be declared as such in the schema. This is required because when reading diffs from the log, update and delete records only expose the primary key columns of the affected rows. If the table is declared as append-only but a deletion, truncation or modification is encountered, an error is raised.
    • publication_name (str | None) – Name of the PostgreSQL publication that covers the target table. Required when mode="streaming".
    • schema_name (str | None) – Name of the PostgreSQL schema in which the table resides. Defaults to "public"; only needs to be changed when using a non-default schema.
    • autocommit_duration_ms (int | None) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
    • max_backlog_size (int | None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Suppose you have a users table with the following columns: id (an auto-incremented integer serving as the primary key), login (a string), and last_seen_at (a unix timestamp). To read this table with Pathway, start by declaring the corresponding schema:

import pathway as pw
class UsersSchema(pw.Schema):
    id: int = pw.column_definition(primary_key=True)
    login: str
    last_seen_at: int

To perform a one-time read of the table, no additional database configuration is required. Simply provide the connection parameters and use "static" mode:

connection_string_parts = {
    "host": "localhost",
    "port": "5432",
    "dbname": "database",
    "user": "user",
    "password": "pass",
}
table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="users",
    schema=UsersSchema,
    mode="static",
)

The resulting table object supports all Pathway transformations and can be passed to any output connector for further processing or storage.

To go beyond a one-time snapshot and perform Change Data Capture (CDC), continuously tracking insertions, updates, deletions, and truncations as they happen, you need to switch to "streaming" mode. This requires the PostgreSQL server to have logical replication enabled (wal_level = logical) and a publication to be created for the target table:

CREATE PUBLICATION users_pub FOR TABLE users;

With the publication in place, the streaming connector can be configured as follows:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="users",
    schema=UsersSchema,
    mode="streaming",
    publication_name="users_pub",
)

There is no need to create a replication slot manually, and doing so is strongly discouraged. A replication slot causes PostgreSQL to retain WAL segments until all changes have been acknowledged by the consumer. If a slot is created but its LSN position is not advanced regularly, unacknowledged WAL can accumulate and eventually exhaust disk space on the database server. To prevent this, Pathway manages the replication slot internally: it uses a temporary slot that is automatically dropped when the session ends, and continuously acknowledges processed LSN positions while the program is running.

write(table, postgres_settings, table_name, *, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None, )

sourceWrites table to a Postgres table. Two types of output tables are supported: stream of changes and snapshot.

When using stream of changes, the output table contains a log of all changes that occurred in the Pathway table. In this case, it is expected to have two additional columns, time and diff, both of integer type. time indicates the transactional minibatch time in which the row change occurred. diff can be either 1 for row insertion or -1 for row deletion.

When using snapshot, the set of columns in the output table matches the set of columns in the table you are writing. No additional columns are created.

  • Parameters
    • table (Table) – Table to be written.
    • postgres_settings (dict) – Components for the connection string for Postgres. The string is formed by joining key-value pairs from the given dictionary with spaces, with each pair formatted as key=value. Keys must be strings. Values can be of any type; if a value is not a string, it will be converted using Python’s str() function.
    • table_name (str) – Name of the target table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table.
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – Defines how the output table manages its data. If set to "stream_of_changes" (the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion). If set to "snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible.
    • primary_key (list[ColumnReference] | None) – When using snapshot mode, one or more columns that form the primary key in the target Postgres table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider there’s a need to output a stream of updates from a table in Pathway to a table in Postgres. Let’s see how this can be done with the connector.

First of all, one needs to provide the required credentials for Postgres connection string. While the connection string can include a wide variety of settings, such as SSL or connection timeouts, in this example we will keep it simple and provide the smallest example possible. Suppose that the database is running locally on the standard port 5432, that it has the name database and is accessible under the username user with a password pass.

It gives us the following content for the connection string:

connection_string_parts = {
    "host": "localhost",
    "port": "5432",
    "dbname": "database",
    "user": "user",
    "password": "pass",
}

Now let’s load a table, which we will output to the database:

import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice 1 \n 2 9 Bob 1 \n 3 8 Alice 2")

In order to output the table, we will need to create a new table in the database. The table would need to have all the columns that the output data has. Moreover it will need integer columns time and diff, because these values are an essential part of the output. Finally, it is also a good idea to create the sequential primary key for our changes so that we know the updates’ order.

To sum things up, the table creation boils down to the following SQL command:

CREATE TABLE pets (
    id SERIAL PRIMARY KEY,
    time INTEGER NOT NULL,
    diff INTEGER NOT NULL,
    age INTEGER,
    owner TEXT,
    pet TEXT
);

Now, having done all the preparation, one can simply call:

pw.io.postgres.write(
    t,
    connection_string_parts,
    "pets",
)

Consider another scenario: the pets table is updated and you need to keep only the latest record for each pet, identified by the pet field in this table. In this case, you need the output table type to be "snapshot". The table can be created automatically in the database if you set init_mode to "replace" or "create_if_not_exists". If you create it manually, the command can look like this:

CREATE TABLE pets (
    pet TEXT PRIMARY KEY,
    age INTEGER,
    owner TEXT
);

The primary key in the target table is the pet field. Therefore, the primary_key parameter for the command should be \[t.pet\]. You can write this table as follows:

pw.io.postgres.write(
    t,
    connection_string_parts,
    "pets",
    output_table_type="snapshot",
    primary_key=[t.pet],
)

write_snapshot(table, postgres_settings, table_name, primary_key, *, max_batch_size=None, init_mode='default', name=None, sort_by=None, )

sourceWARNING: This method is deprecated. Please use pw.io.postgres.write with the parameter output_table_type="snapshot" instead. Note that the new version does not create the time and diff columns and maintains a current snapshot of the table you are writing.

Maintains a snapshot of a table within a Postgres table.

In order for write to be successful, it is required that the table contains time and diff columns of the integer type.

  • Parameters
    • postgres_settings (dict) – Components of the connection string for Postgres.
    • table_name (str) – Name of the target table.
    • primary_key (list[str]) – Names of the fields which serve as a primary key in the Postgres table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider there is a table stats in Pathway, containing the average number of requests to some service or operation per user, over some period of time. The number of requests can be large, so we decide not to store the whole stream of changes, but to only store a snapshot of the data, which can be actualized by Pathway.

The minimum set-up would require us to have a Postgres table with two columns: the ID of the user user_id and the number of requests across some period of time number_of_requests. In order to maintain consistency, we also need two extra columns: time and diff.

The SQL for the creation of such table would look as follows:

CREATE TABLE user_stats (
    user_id TEXT PRIMARY KEY,
    number_of_requests INTEGER,
    time INTEGER NOT NULL,
    diff INTEGER NOT NULL
);

After the table is created, all you need is just to set up the output connector:

import pathway as pw
pw.io.postgres.write_snapshot(  
   stats,
   {
       "host": "localhost",
       "port": "5432",
       "dbname": "database",
       "user": "user",
       "password": "pass",
   },
   "user_stats",
   ["user_id"],
)