pw.io.postgres
Pathway provides both Input and Output connectors for PostgreSQL.
The Input connector reads changes by consuming diffs from the PostgreSQL Write-Ahead Log (WAL). This allows Pathway to ingest changes in a streaming fashion directly from the database replication stream.
The Output connector writes a Pathway table into PostgreSQL. It supports two operating modes:
- Stream of changes mode which propagates updates as a stream of changes.
- Snapshot mode which maintains an exact copy (replica) of the Pathway table in PostgreSQL.
TLS Support
TLS is supported in both the Input and Output connectors and behaves identically in each case.
TLS configuration is controlled via the sslmode and sslrootcert parameters passed inside postgres_settings for both connectors.
The sslmode parameter follows the official PostgreSQL documentation: sslmode. It defines the level of TLS verification performed during connection establishment.
If no TLS-related parameters are provided, the connector defaults to sslmode="prefer". In this mode, the system first attempts to establish a TLS-encrypted connection and falls back to an unencrypted connection if TLS negotiation fails.
Type Conversion (Output Connector)
The table below describes how Pathway types map to PostgreSQL types in the output connector. All types can be round-tripped back via the read connector when the original schema type is specified.
Pathway types conversion into Postgres
| Pathway type | Postgres type |
|---|---|
bool | BOOLEAN |
int | BIGINT. If the field type is SMALLINT or INTEGER, the connector will attempt to cast the value to the specified type. |
float | DOUBLE PRECISION. If the field type is REAL, the connector will also attempt to cast the value accordingly. |
pointer | TEXT |
str | TEXT |
bytes | BYTEA |
Naive DateTime | TIMESTAMP |
UTC DateTime | TIMESTAMPTZ |
Duration | BIGINT, serialized and deserialized with microsecond precision |
JSON | JSONB. If the field type is JSON, the connector will also attempt to cast the value accordingly. |
np.ndarray | <element_type> ARRAY. The element type is determined by the PostgreSQL column type. Supports multi-dimensional arrays. Arrays must be rectangular. |
tuple (homogeneous) | <element_type> ARRAY. Supports multi-dimensional arrays. Arrays must be rectangular. |
list (homogeneous) | <element_type> ARRAY. Supports multi-dimensional arrays. Arrays must be rectangular. |
pw.PyObjectWrapper | BYTEA |
| Array semantics: |
- Multi-dimensional arrays are supported.
- Arrays must be rectangular (jagged arrays are rejected).
NULLelements inside arrays are supported.- The PostgreSQL column type determines the element type.
- Only built-in PostgreSQL scalar element types are supported.
read(postgres_settings, table_name, schema, *, mode='streaming', is_append_only=False, publication_name=None, schema_name='public', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)
sourceThis module is available when using one of the following licenses only:Pathway Scale, Pathway Enterprise.
Reads a table from a PostgreSQL database.
This connector provides a lightweight alternative to pw.io.debezium.read.
It supports two modes: "static" and "streaming".
In "static" mode, the table is read once and the connector stops afterward.
In "streaming" mode, a temporary replication slot is created using the
pgoutput logical decoding plugin, which is bundled with PostgreSQL and requires
no additional installation. The slot is created with the export snapshot option,
ensuring a consistent initial read. On startup, the connector first performs a
snapshot of the table as it existed at the moment the replication slot was created,
and then begins consuming the PostgreSQL write-ahead log (WAL), applying incremental
changes on top of that snapshot. Because the replication slot is temporary,
PostgreSQL will automatically drop it once the connection is closed (i.e., when the
program terminates).
To enable replication, a publication must be created in the database beforehand:
CREATE PUBLICATION {publication_name} FOR TABLE {table_name};
- Parameters
- postgres_settings (
dict) – Connection parameters for PostgreSQL, provided as a dictionary of key-value pairs. The connection string is assembled by joining all pairs with spaces, each formatted askey=value. Keys must be strings; values of other types are converted via Python’sstr(). - table_name (
str) – Name of the PostgreSQL table to read from. - schema (
type[Schema]) – Pathway schema describing the table’s columns and their types. - mode (
Literal['streaming','static']) – Polling mode for the connector. Accepted values are"streaming"(default) and"static". In"streaming"mode, the connector tracks changes in the table via the WAL, reflecting insertions, updates, deletions, and truncations in real time; requirespublication_nameto be specified. In"static"mode, the connector reads all currently available rows in a single commit and then stops. - is_append_only (
bool) – Used in streaming mode. Specifies whether the input table is append-only. If the table is not append-only, it must have a primary key, and all primary key columns must be declared as such in the schema. This is required because when reading diffs from the log, update and delete records only expose the primary key columns of the affected rows. If the table is declared as append-only but a deletion, truncation or modification is encountered, an error is raised. - publication_name (
str|None) – Name of the PostgreSQL publication that covers the target table. Required whenmode="streaming". - schema_name (
str|None) – Name of the PostgreSQL schema in which the table resides. Defaults to"public"; only needs to be changed when using a non-default schema. - autocommit_duration_ms (
int|None) – the maximum time between two commits. Everyautocommit_duration_msmilliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes. - debug_data (
Any) – Static data replacing original one when debug mode is active.
- postgres_settings (
- Returns
Table – The table read.
Example:
Suppose you have a users table with the following columns: id (an auto-incremented
integer serving as the primary key), login (a string), and last_seen_at (a unix
timestamp). To read this table with Pathway, start by declaring the corresponding schema:
import pathway as pw
class UsersSchema(pw.Schema):
id: int = pw.column_definition(primary_key=True)
login: str
last_seen_at: int
To perform a one-time read of the table, no additional database configuration is required.
Simply provide the connection parameters and use "static" mode:
connection_string_parts = {
"host": "localhost",
"port": "5432",
"dbname": "database",
"user": "user",
"password": "pass",
}
table = pw.io.postgres.read(
postgres_settings=connection_string_parts,
table_name="users",
schema=UsersSchema,
mode="static",
)
The resulting table object supports all Pathway transformations and can be passed
to any output connector for further processing or storage.
To go beyond a one-time snapshot and perform Change Data Capture (CDC), continuously
tracking insertions, updates, deletions, and truncations as they happen, you need to
switch to "streaming" mode. This requires the PostgreSQL server to have logical replication
enabled (wal_level = logical) and a publication to be created for the target table:
CREATE PUBLICATION users_pub FOR TABLE users;
With the publication in place, the streaming connector can be configured as follows:
table = pw.io.postgres.read(
postgres_settings=connection_string_parts,
table_name="users",
schema=UsersSchema,
mode="streaming",
publication_name="users_pub",
)
There is no need to create a replication slot manually, and doing so is strongly discouraged. A replication slot causes PostgreSQL to retain WAL segments until all changes have been acknowledged by the consumer. If a slot is created but its LSN position is not advanced regularly, unacknowledged WAL can accumulate and eventually exhaust disk space on the database server. To prevent this, Pathway manages the replication slot internally: it uses a temporary slot that is automatically dropped when the session ends, and continuously acknowledges processed LSN positions while the program is running.
write(table, postgres_settings, table_name, *, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None, )
sourceWrites table to a Postgres table. Two types of output tables are supported:
stream of changes and snapshot.
When using stream of changes, the output table contains a log of all changes that
occurred in the Pathway table. In this case, it is expected to have two additional columns,
time and diff, both of integer type. time indicates the transactional
minibatch time in which the row change occurred. diff can be either 1 for
row insertion or -1 for row deletion.
When using snapshot, the set of columns in the output table matches the set of columns in the table you are writing. No additional columns are created.
- Parameters
- table (
Table) – Table to be written. - postgres_settings (
dict) – Components for the connection string for Postgres. The string is formed by joining key-value pairs from the given dictionary with spaces, with each pair formatted as key=value. Keys must be strings. Values can be of any type; if a value is not a string, it will be converted using Python’s str() function. - table_name (
str) – Name of the target table. - max_batch_size (
int|None) – Maximum number of entries allowed to be committed within a single transaction. - init_mode (
Literal['default','create_if_not_exists','replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table. - output_table_type (
Literal['stream_of_changes','snapshot']) – Defines how the output table manages its data. If set to"stream_of_changes"(the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns:time, representing the computation minibatch, anddiff, indicating the type of change (1for row addition and-1for row deletion). If set to"snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible. - primary_key (
list[ColumnReference] |None) – When using snapshot mode, one or more columns that form the primary key in the target Postgres table. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
Consider there’s a need to output a stream of updates from a table in Pathway to a table in Postgres. Let’s see how this can be done with the connector.
First of all, one needs to provide the required credentials for Postgres
connection string.
While the connection string can include a wide variety of settings, such as SSL
or connection timeouts, in this example we will keep it simple and provide the
smallest example possible. Suppose that the database is running locally on the standard
port 5432, that it has the name database and is accessible under the username
user with a password pass.
It gives us the following content for the connection string:
connection_string_parts = {
"host": "localhost",
"port": "5432",
"dbname": "database",
"user": "user",
"password": "pass",
}
Now let’s load a table, which we will output to the database:
import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice 1 \n 2 9 Bob 1 \n 3 8 Alice 2")
In order to output the table, we will need to create a new table in the database. The table
would need to have all the columns that the output data has. Moreover it will need
integer columns time and diff, because these values are an essential part of the
output. Finally, it is also a good idea to create the sequential primary key for
our changes so that we know the updates’ order.
To sum things up, the table creation boils down to the following SQL command:
CREATE TABLE pets (
id SERIAL PRIMARY KEY,
time INTEGER NOT NULL,
diff INTEGER NOT NULL,
age INTEGER,
owner TEXT,
pet TEXT
);
Now, having done all the preparation, one can simply call:
pw.io.postgres.write(
t,
connection_string_parts,
"pets",
)
Consider another scenario: the pets table is updated and you need to keep only
the latest record for each pet, identified by the pet field in this table.
In this case, you need the output table type to be "snapshot". The table can be
created automatically in the database if you set init_mode to "replace" or
"create_if_not_exists". If you create it manually, the command can look like this:
CREATE TABLE pets (
pet TEXT PRIMARY KEY,
age INTEGER,
owner TEXT
);
The primary key in the target table is the pet field. Therefore, the primary_key
parameter for the command should be \[t.pet\]. You can write this table as follows:
pw.io.postgres.write(
t,
connection_string_parts,
"pets",
output_table_type="snapshot",
primary_key=[t.pet],
)
write_snapshot(table, postgres_settings, table_name, primary_key, *, max_batch_size=None, init_mode='default', name=None, sort_by=None, )
sourceWARNING: This method is deprecated. Please use pw.io.postgres.write with
the parameter output_table_type="snapshot" instead. Note that the new version
does not create the time and diff columns and maintains a current snapshot
of the table you are writing.
Maintains a snapshot of a table within a Postgres table.
In order for write to be successful, it is required that the table contains time
and diff columns of the integer type.
- Parameters
- postgres_settings (
dict) – Components of the connection string for Postgres. - table_name (
str) – Name of the target table. - primary_key (
list[str]) – Names of the fields which serve as a primary key in the Postgres table. - max_batch_size (
int|None) – Maximum number of entries allowed to be committed within a single transaction. - init_mode (
Literal['default','create_if_not_exists','replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- postgres_settings (
- Returns
None
Example:
Consider there is a table stats in Pathway, containing the average number of requests to some
service or operation per user, over some period of time. The number of requests
can be large, so we decide not to store the whole stream of changes, but to only store
a snapshot of the data, which can be actualized by Pathway.
The minimum set-up would require us to have a Postgres table with two columns: the ID
of the user user_id and the number of requests across some period of time number_of_requests.
In order to maintain consistency, we also need two extra columns: time and diff.
The SQL for the creation of such table would look as follows:
CREATE TABLE user_stats (
user_id TEXT PRIMARY KEY,
number_of_requests INTEGER,
time INTEGER NOT NULL,
diff INTEGER NOT NULL
);
After the table is created, all you need is just to set up the output connector:
import pathway as pw
pw.io.postgres.write_snapshot(
stats,
{
"host": "localhost",
"port": "5432",
"dbname": "database",
"user": "user",
"password": "pass",
},
"user_stats",
["user_id"],
)