pw.io.postgres

Pathway provides both Input and Output connectors for PostgreSQL.

The Input connector reads changes by consuming diffs from the PostgreSQL Write-Ahead Log (WAL). This allows Pathway to ingest changes in a streaming fashion directly from the database replication stream.

The Output connector writes a Pathway table into PostgreSQL. It supports two operating modes:

  • Stream of changes mode which propagates updates as a stream of changes.
  • Snapshot mode which maintains an exact copy (replica) of the Pathway table in PostgreSQL.

TLS Support

TLS is supported in both the Input and Output connectors and behaves identically in each case.

TLS configuration is controlled via the sslmode and sslrootcert parameters passed inside postgres_settings for both connectors.

The sslmode parameter follows the official PostgreSQL documentation: sslmode. It defines the level of TLS verification performed during connection establishment.

If no TLS-related parameters are provided, the connector defaults to sslmode="prefer". In this mode, the system first attempts to establish a TLS-encrypted connection and falls back to an unencrypted connection if TLS negotiation fails.

TCP Keepalives

Pathway sets conservative TCP-keepalive parameters on every PostgreSQL connection it opens, so that a stalled or terminated Pathway process is detected by PostgreSQL within minutes rather than the OS-inherited default (≈ 2 hours on Linux). This matters most for the streaming reader: while PostgreSQL still believes the client is alive, it keeps the temporary replication slot active and pins write-ahead log retention on disk.

The defaults apply uniformly across every connection Pathway opens — both for snapshot reads/writes and for the streaming WAL reader — so a single set of parameter names and units describes the behavior.

Pathway-managed connection-string defaults

ParameterDefaultWhat it does
keepalives1Master switch — enables TCP keepalive probes on idle connections. Set to 0 to disable keepalives entirely.
keepalives_idle300Seconds of idleness before the first keepalive probe is sent.
keepalives_interval30Seconds between subsequent probes after the first.
keepalives_count3Number of missed probes the kernel tolerates before declaring the connection dead.
tcp_user_timeout300000Milliseconds. Kills connections whose unacknowledged data has been outstanding for this long; complements the keepalive trio by covering the case where the connection is actively sending bytes that the peer isn’t acknowledging.

With these defaults, an idle connection is declared dead at keepalives_idle + keepalives_interval × keepalives_count = 300 + 30 × 3 = 390 seconds (≈ 6.5 minutes), and an actively-streaming connection that stops being acknowledged is killed by tcp_user_timeout after 5 minutes. The values are deliberately conservative on the “minimize false positives” axis: short-lived network blips (NAT rebinding, brief routing changes, single-AZ failover events) routinely take under a minute to resolve and should not force a reconnect.

Each of these parameters can be overridden by passing the same key in postgres_settings. The connector uses dict.setdefault() internally, so any value the user provides is preserved verbatim — Pathway never overrides explicit user choices.

NOTE: Parameter names and units follow the standard PostgreSQL connection-string conventions documented in the PostgreSQL connection documentation. tcp_user_timeout is given in milliseconds, not seconds.

Writer Retries on Transient Errors

The output connector automatically retries flushes that fail with a transient error — broken connections, server shutdowns, deadlocks, and serialization failures. Retries use exponential backoff and are capped at three attempts; if every attempt fails, the original error is surfaced to the pipeline. Permanent failures (syntax errors, missing tables, constraint violations, type mismatches) are not retried and propagate on the first attempt so they do not waste backoff latency.

Each retry reconnects to PostgreSQL from scratch and replays the same buffered rows, so a transient failure mid-batch does not lose rows: either all rows in the batch are committed or the error reaches the pipeline.

Passwordless Authentication

postgres_settings does not require user, password, or host to be set. Any of these keys may be omitted, and PostgreSQL’s normal resolution rules then apply: the OS user is used for user, ~/.pgpass is consulted for password, and a local UNIX socket is used for host. This makes the connector usable with passwordless pg_hba.conf modes such as trust, peer, ident, and cert without any extra configuration. The server-side pg_hba.conf decides whether the resulting connection is authorized.

Type Conversion (Input Connector)

The table below describes how PostgreSQL column types are parsed into Pathway values, and what type to declare in your pw.Schema to receive them correctly.

PostgreSQL types parsed by the input connector

PostgreSQL typePathway schema type and notes
BOOLEANbool
SMALLINT / INT2int
INTEGER / INT4int
BIGINT / INT8int. Alternatively, declare as pw.Duration to interpret the integer value as microseconds — use this when the column was written by Pathway’s output connector, which serializes pw.Duration as a BIGINT microsecond count.
REAL / FLOAT4float
DOUBLE PRECISION / FLOAT8float
NUMERIC / DECIMALfloat — parsed as f64. Precision loss is possible for values with more than ~15 significant digits. Special values 'NaN', 'Infinity', and '-Infinity' (the latter two available in PostgreSQL 14+) round-trip as their IEEE-754 counterparts.
OIDint — the unsigned 32-bit OID is widened to a signed 64-bit integer.
TEXT / VARCHAR / CHAR / NAMEstr
User-defined ENUMstr — the raw label as declared in CREATE TYPE ... AS ENUM (...).
UUIDstr — formatted as a standard hyphenated lowercase string, e.g. "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11".
INETstr — plain host address when the host prefix equals the maximum for its address family ("192.168.1.1"); address with prefix otherwise ("192.168.1.1/28"). Supports both IPv4 and IPv6.
CIDRstr — always includes the network prefix, e.g. "192.168.1.0/24" or "2001:db8::/32".
MACADDRstr — formatted as "xx:xx:xx:xx:xx:xx" with lowercase hex digits.
MACADDR8str — formatted as "xx:xx:xx:xx:xx:xx:xx:xx" (EUI-64) with lowercase hex digits.
BYTEAbytes. Alternatively, declare as pw.PyObjectWrapper when the column was written by Pathway’s output connector for a pw.PyObjectWrapper column — the binary content will be deserialized.
JSON / JSONBpw.Json
TIMESTAMPpw.DateTimeNaive
TIMESTAMPTZpw.DateTimeUtc
DATEpw.DateTimeNaive — midnight (00:00:00) on the given date.
TIMEpw.Duration — microseconds elapsed since midnight.
TIMETZpw.Duration — the timezone offset is applied so that the result represents microseconds since UTC midnight (e.g. 12:30:00+02:00 becomes 10.5 h expressed in microseconds).
INTERVALpw.Duration — total microseconds: months × 30 × 86 400 × 10⁶ + days × 86 400 × 10⁶ + microseconds. Months are approximated as 30 days.
vector (pgvector extension)np.ndarray — 1-D array of float64.
halfvec (pgvector extension)np.ndarray — 1-D array of float64; each float16 element is promoted to float64.
T ARRAY (any array type)list of <pathway_type_of_T> — multi-dimensional arrays are returned as nested tuple values. Arrays must be rectangular; NULL elements are represented as None. The element type follows the scalar mapping for T from the rows above. For int and float element types, declaring np.ndarray with the corresponding element type instead produces a typed ndarray; dimensionality is validated against the schema declaration.
Any nullable columnDeclare the field as nullable. It will be parsed as None if the PostgreSQL value is NULL; otherwise the value produced by the mapping for type T. If the schema field is not declared as nullable and a NULL is received, an error is raised.

Type Conversion (Output Connector)

The table below describes how Pathway types map to PostgreSQL types in the output connector. All types can be round-tripped back via the read connector when the original schema type is specified.

Pathway types conversion into Postgres

Pathway typePostgres type
boolBOOLEAN
intBIGINT is the default. SMALLINT and INTEGER columns are also accepted — the connector casts at flush time and a value that exceeds the destination width surfaces as a Pathway-level error rather than silently wrapping. PostgreSQL’s internal single-byte "char" type (the quoted variant, distinct from SQL standard CHAR(n) which is a string type) is also accepted on the same path; this is mostly useful for round-tripping rows read from catalog-style columns.
floatDOUBLE PRECISION. If the field type is REAL, the connector will also attempt to cast the value accordingly.
pointerTEXT
strTEXT is the default type when Pathway creates the table. Pre-existing UUID, INET, CIDR, MACADDR, and MACADDR8 columns are also supported — the string is parsed with the same syntax the input connector emits on read, so round-tripping is exact. Malformed values surface as a Pathway-level error at flush time.
bytesBYTEA
Naive DateTimeTIMESTAMP is the default type when Pathway creates the table itself. Pre-existing DATE columns are also supported — non-midnight time components are silently truncated, matching PostgreSQL’s own implicit TIMESTAMP\`→``DATE` cast.
UTC DateTimeTIMESTAMPTZ
DurationBIGINT (microseconds) is the default type the writer emits when it creates the table itself (init_mode="replace" / "create_if_not_exists"). Pre-existing INTERVAL and TIME columns are also supported — for INTERVAL the writer packs the total microseconds into the binary layout with zero days and months (so the round-trip through the input connector is exact); for TIME the writer emits the raw microseconds-since-midnight. SMALLINT and INTEGER columns are accepted as well, with the same microsecond encoding — be aware that microseconds overflow a 32-bit signed range after roughly 36 minutes (INTEGER) and a 16-bit signed range after about 33 milliseconds (SMALLINT), and the cast surfaces as a Pathway-level error at flush time, so use these narrow widths only when the duration is bounded by construction.
JSONJSONB. If the field type is JSON, the connector will also attempt to cast the value accordingly.
np.ndarray<element_type> ARRAY. The element type is determined by the PostgreSQL column type. Supports multi-dimensional arrays. Arrays must be rectangular.
tuple (homogeneous)<element_type> ARRAY. Supports multi-dimensional arrays. Arrays must be rectangular.
list (homogeneous)<element_type> ARRAY. Supports multi-dimensional arrays. Arrays must be rectangular.
pw.PyObjectWrapperBYTEA

Array semantics

  • Multi-dimensional arrays are supported.
  • Arrays must be rectangular (jagged arrays are rejected).
  • NULL elements inside arrays are supported.
  • The PostgreSQL column type determines the element type.
  • Only built-in PostgreSQL scalar element types are supported.

read(postgres_settings, table_name, schema, *, mode='streaming', is_append_only=False, publication_name=None, schema_name='public', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)

sourceThis module is available when using one of the following licenses only:Pathway Scale, Pathway Enterprise.

Reads a table from a PostgreSQL database.

This connector provides a lightweight alternative to pw.io.debezium.read. It supports two modes: "static" and "streaming".

In "static" mode, the table is read once and the connector stops afterward.

In "streaming" mode, a temporary replication slot is created using the pgoutput logical decoding plugin, which is bundled with PostgreSQL and requires no additional installation. The slot is created with the export snapshot option, ensuring a consistent initial read. On startup, the connector first performs a snapshot of the table as it existed at the moment the replication slot was created, and then begins consuming the PostgreSQL write-ahead log (WAL), applying incremental changes on top of that snapshot. Because the replication slot is temporary, PostgreSQL will automatically drop it once the connection is closed (i.e., when the program terminates).

To enable replication, a publication must be created in the database beforehand:

CREATE PUBLICATION {publication_name} FOR TABLE {table_name};
  • Parameters
    • postgres_settings (dict) – Connection parameters for PostgreSQL, provided as a dictionary of key-value pairs. The connection string is assembled by joining all pairs with spaces, each formatted as key=value. Keys must be strings; values of other types are converted via Python’s str(). Pathway injects conservative TCP-keepalive defaults (keepalives, keepalives_idle=300, keepalives_interval=30, keepalives_count=3, and tcp_user_timeout=300000) so that an unreachable Pathway process is detected by PostgreSQL within minutes rather than the OS-inherited ~2-hour default; any of these can be overridden by passing the same key in postgres_settings.
    • table_name (str) – Name of the PostgreSQL table to read from. Any PostgreSQL identifier is accepted — the connector quotes the name before interpolating it into generated SQL, so hyphens, mixed case, and reserved words round-trip as-is.
    • schema (type[Schema]) – Pathway schema describing the table’s columns and their types. Column names may be any PostgreSQL identifier for the same reason as table_name.
    • mode (Literal['streaming', 'static']) – Polling mode for the connector. Accepted values are "streaming" (default) and "static". In "streaming" mode, the connector tracks changes in the table via the WAL, reflecting insertions, updates, deletions, and truncations in real time; requires publication_name to be specified. In "static" mode, the connector reads all currently available rows in a single commit and then stops.
    • is_append_only (bool) – Used in streaming mode. Specifies whether the input table is append-only. If the table is not append-only, it must have a primary key, and all primary key columns must be declared as such in the schema. This is required because when reading diffs from the log, update and delete records only expose the primary key columns of the affected rows. If the table is declared as append-only but a deletion, truncation or modification is encountered, an error is raised.
    • publication_name (str | None) – Name of the PostgreSQL publication that covers the target table. Required when mode="streaming".
    • schema_name (str | None) – Name of the PostgreSQL schema in which the table resides. Defaults to "public"; only needs to be changed when using a non-default schema.
    • autocommit_duration_ms (int | None) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress. It is also surfaced to PostgreSQL as part of the connection’s application_name (pathway:<name>), so operators can filter pg_stat_activity and server logs by connector.
    • max_backlog_size (int | None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Suppose you have a users table with the following columns: id (an auto-incremented integer serving as the primary key), login (a string), and last_seen_at (a unix timestamp). To read this table with Pathway, start by declaring the corresponding schema:

import pathway as pw
class UsersSchema(pw.Schema):
    id: int = pw.column_definition(primary_key=True)
    login: str
    last_seen_at: int

To perform a one-time read of the table, no additional database configuration is required. Simply provide the connection parameters and use "static" mode:

connection_string_parts = {
    "host": "localhost",
    "port": "5432",
    "dbname": "database",
    "user": "user",
    "password": "pass",
}
table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="users",
    schema=UsersSchema,
    mode="static",
)

The resulting table object supports all Pathway transformations and can be passed to any output connector for further processing or storage.

To go beyond a one-time snapshot and perform Change Data Capture (CDC), continuously tracking insertions, updates, deletions, and truncations as they happen, you need to switch to "streaming" mode. This requires the PostgreSQL server to have logical replication enabled (wal_level = logical) and a publication to be created for the target table:

CREATE PUBLICATION users_pub FOR TABLE users;

With the publication in place, the streaming connector can be configured as follows:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="users",
    schema=UsersSchema,
    mode="streaming",
    publication_name="users_pub",
)

There is no need to create a replication slot manually, and doing so is strongly discouraged. A replication slot causes PostgreSQL to retain WAL segments until all changes have been acknowledged by the consumer. If a slot is created but its LSN position is not advanced regularly, unacknowledged WAL can accumulate and eventually exhaust disk space on the database server. To prevent this, Pathway manages the replication slot internally: it uses a temporary slot that is automatically dropped when the session ends, and continuously acknowledges processed LSN positions while the program is running.

The examples above use an integer primary key, but other primary key types are supported as well.

Suppose you have a products table where each row is identified by a string product code such as "SKU-001", alongside a name column and a price column. The schema in this case is:

class ProductsSchema(pw.Schema):
    sku: str = pw.column_definition(primary_key=True)
    name: str
    price: float

Both "static" and "streaming" modes are supported, set up in exactly the same way as for an integer primary key. For a one-time snapshot:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="products",
    schema=ProductsSchema,
    mode="static",
)

For continuous CDC, create a publication first:

CREATE PUBLICATION products_pub FOR TABLE products;

Then configure the streaming connector:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="products",
    schema=ProductsSchema,
    mode="streaming",
    publication_name="products_pub",
)

PostgreSQL’s UUID type is also supported. Because Pathway represents UUID values as strings, the corresponding schema field must be declared as str. Suppose you have a messages table whose primary key is a UUID column id, alongside a string body column:

class MessagesSchema(pw.Schema):
    id: str = pw.column_definition(primary_key=True)
    body: str

Pathway will read the UUID values as standard hyphenated strings, for example "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11". Both modes are supported. For a one-time snapshot:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="messages",
    schema=MessagesSchema,
    mode="static",
)

For continuous CDC, create a publication first:

CREATE PUBLICATION messages_pub FOR TABLE messages;

Then configure the streaming connector:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="messages",
    schema=MessagesSchema,
    mode="streaming",
    publication_name="messages_pub",
)

Tables with composite primary keys — where the primary key spans multiple columns — are supported as well. To declare a composite primary key in Pathway, mark every participating column with pw.column_definition(primary_key=True). Suppose you have an order_items table where each row is uniquely identified by the combination of order_id and product_id, both integers, alongside a quantity column:

class OrderItemsSchema(pw.Schema):
    order_id: int = pw.column_definition(primary_key=True)
    product_id: int = pw.column_definition(primary_key=True)
    quantity: int

Both order_id and product_id are marked as primary key columns, matching the PRIMARY KEY (order_id, product_id) constraint on the PostgreSQL side. In streaming mode, this is especially important: when an update or delete event arrives in the WAL, PostgreSQL only exposes the primary key columns of the affected row, so all primary key columns must be declared as such in the schema.

Both modes are supported. For a one-time snapshot:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="order_items",
    schema=OrderItemsSchema,
    mode="static",
)

For continuous CDC, create a publication first:

CREATE PUBLICATION order_items_pub FOR TABLE order_items;

Then configure the streaming connector:

table = pw.io.postgres.read(
    postgres_settings=connection_string_parts,
    table_name="order_items",
    schema=OrderItemsSchema,
    mode="streaming",
    publication_name="order_items_pub",
)

write(table, postgres_settings, table_name, *, schema_name='public', max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None, )

sourceWrites table to a Postgres table. Two types of output tables are supported: stream of changes and snapshot.

When using stream of changes, the output table contains a log of all changes that occurred in the Pathway table. In this case, it is expected to have two additional columns, time and diff, both of integer type. time indicates the transactional minibatch time in which the row change occurred. diff can be either 1 for row insertion or -1 for row deletion.

When using snapshot, the set of columns in the output table matches the set of columns in the table you are writing. No additional columns are created.

  • Parameters
    • table (Table) – Table to be written.
    • postgres_settings (dict) – Components for the connection string for Postgres. The string is formed by joining key-value pairs from the given dictionary with spaces, with each pair formatted as key=value. Keys must be strings. Values can be of any type; if a value is not a string, it will be converted using Python’s str() function. Pathway injects conservative TCP-keepalive defaults (keepalives, keepalives_idle=300, keepalives_interval=30, keepalives_count=3, and tcp_user_timeout=300000) so that an unreachable Pathway process is detected by PostgreSQL within minutes rather than the OS-inherited ~2-hour default; any of these can be overridden by passing the same key in postgres_settings.
    • table_name (str) – Name of the target table. Any PostgreSQL identifier is accepted — the connector quotes the name before interpolating it into generated SQL, so hyphens, mixed case, and reserved words round-trip as-is. Column names in table and in primary_key are quoted the same way.
    • schema_name (str | None) – Name of the PostgreSQL schema that owns the target table. Defaults to "public". Set this when writing to a non-default schema; the name is quoted identically to table_name.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table.
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – Defines how the output table manages its data. If set to "stream_of_changes" (the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion). If set to "snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible.
    • primary_key (list[ColumnReference] | None) – When using snapshot mode, one or more columns that form the primary key in the target Postgres table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. It is also surfaced to PostgreSQL as part of the connection’s application_name (pathway:<name>), so operators can filter pg_stat_activity and server logs by connector.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider there’s a need to output a stream of updates from a table in Pathway to a table in Postgres. Let’s see how this can be done with the connector.

First of all, one needs to provide the required credentials for Postgres connection string. While the connection string can include a wide variety of settings, such as SSL or connection timeouts, in this example we will keep it simple and provide the smallest example possible. Suppose that the database is running locally on the standard port 5432, that it has the name database and is accessible under the username user with a password pass.

It gives us the following content for the connection string:

connection_string_parts = {
    "host": "localhost",
    "port": "5432",
    "dbname": "database",
    "user": "user",
    "password": "pass",
}

Now let’s load a table, which we will output to the database:

import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice 1 \n 2 9 Bob 1 \n 3 8 Alice 2")

In order to output the table, we will need to create a new table in the database. The table would need to have all the columns that the output data has. Moreover it will need a time column of type BIGINT (Pathway timestamps are milliseconds since epoch and routinely exceed the 32-bit range) and a diff column of type SMALLINT. Finally, it is also a good idea to create the sequential primary key for our changes so that we know the updates’ order.

To sum things up, the table creation boils down to the following SQL command:

CREATE TABLE pets (
    id SERIAL PRIMARY KEY,
    time BIGINT NOT NULL,
    diff SMALLINT NOT NULL,
    age BIGINT,
    owner TEXT,
    pet TEXT
);

Now, having done all the preparation, one can simply call:

pw.io.postgres.write(
    t,
    connection_string_parts,
    "pets",
)

Consider another scenario: the pets table is updated and you need to keep only the latest record for each pet, identified by the pet field in this table. In this case, you need the output table type to be "snapshot". The table can be created automatically in the database if you set init_mode to "replace" or "create_if_not_exists". If you create it manually, the command can look like this:

CREATE TABLE pets (
    pet TEXT PRIMARY KEY,
    age INTEGER,
    owner TEXT
);

The primary key in the target table is the pet field. Therefore, the primary_key parameter for the command should be \[t.pet\]. You can write this table as follows:

pw.io.postgres.write(
    t,
    connection_string_parts,
    "pets",
    output_table_type="snapshot",
    primary_key=[t.pet],
)

write_snapshot(table, postgres_settings, table_name, primary_key, *, max_batch_size=None, init_mode='default', name=None, sort_by=None, )

sourceWARNING: This method is deprecated. Please use pw.io.postgres.write with the parameter output_table_type="snapshot" instead. Note that the new version does not create the time and diff columns and maintains a current snapshot of the table you are writing.

Maintains a snapshot of a table within a Postgres table.

In order for write to be successful, it is required that the table contains time and diff columns of the integer type.

  • Parameters
    • postgres_settings (dict) – Components of the connection string for Postgres.
    • table_name (str) – Name of the target table.
    • primary_key (list[str]) – Names of the fields which serve as a primary key in the Postgres table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider there is a table stats in Pathway, containing the average number of requests to some service or operation per user, over some period of time. The number of requests can be large, so we decide not to store the whole stream of changes, but to only store a snapshot of the data, which can be actualized by Pathway.

The minimum set-up would require us to have a Postgres table with two columns: the ID of the user user_id and the number of requests across some period of time number_of_requests. In order to maintain consistency, we also need two extra columns: time and diff.

The SQL for the creation of such table would look as follows:

CREATE TABLE user_stats (
    user_id TEXT PRIMARY KEY,
    number_of_requests BIGINT,
    time BIGINT NOT NULL,
    diff SMALLINT NOT NULL
);

After the table is created, all you need is just to set up the output connector:

import pathway as pw
pw.io.postgres.write_snapshot(  
   stats,
   {
       "host": "localhost",
       "port": "5432",
       "dbname": "database",
       "user": "user",
       "password": "pass",
   },
   "user_stats",
   ["user_id"],
)