pw.io.postgres

Pathway types conversion into Postgres

Pathway typePostgres type
boolBOOLEAN
intBIGINT
floatDOUBLE PRECISION
pointerTEXT
strTEXT
bytesBYTEA
Naive DateTimeTIMESTAMP
UTC DateTimeTIMESTAMPTZ
DurationBIGINT, serialized and deserialized with microsecond precision
np.ndarrayCurrently not supported
tuple<type> ARRAY, only homogeneous tuples are supported
listCurrently not supported
pw.PyObjectWrapperBYTEA

write(table, postgres_settings, table_name, *, max_batch_size=None, init_mode='default', name=None, sort_by=None)

sourceWrites table’s stream of updates to a postgres table.

In order for write to be successful, it is required that the table contains time and diff columns of the integer type.

  • Parameters
    • table (Table) – Table to be written.
    • postgres_settings (dict) – Components for the connection string for Postgres.
    • table_name (str) – Name of the target table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider there’s a need to output a stream of updates from a table in Pathway to a table in Postgres. Let’s see how this can be done with the connector.

First of all, one needs to provide the required credentials for Postgres connection string. While the connection string can include a wide variety of settings, such as SSL or connection timeouts, in this example we will keep it simple and provide the smallest example possible. Suppose that the database is running locally on the standard port 5432, that it has the name database and is accessible under the username user with a password pass.

It gives us the following content for the connection string:

connection_string_parts = {
    "host": "localhost",
    "port": "5432",
    "dbname": "database",
    "user": "user",
    "password": "pass",
}

Now let’s load a table, which we will output to the database:

import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice 1 \n 2 9 Bob 1 \n 3 8 Alice 2")

In order to output the table, we will need to create a new table in the database. The table would need to have all the columns that the output data has. Moreover it will need integer columns time and diff, because these values are an essential part of the output. Finally, it is also a good idea to create the sequential primary key for our changes so that we know the updates’ order.

To sum things up, the table creation boils down to the following SQL command:

CREATE TABLE pets (
    id SERIAL PRIMARY KEY,
    time INTEGER NOT NULL,
    diff INTEGER NOT NULL,
    age INTEGER,
    owner TEXT,
    pet TEXT
);

Now, having done all the preparation, one can simply call:

pw.io.postgres.write(
    t,
    connection_string_parts,
    "pets",
)

write_snapshot(table, postgres_settings, table_name, primary_key, *, max_batch_size=None, init_mode='default', name=None, sort_by=None)

sourceMaintains a snapshot of a table within a Postgres table.

In order for write to be successful, it is required that the table contains time and diff columns of the integer type.

  • Parameters
    • postgres_settings (dict) – Components of the connection string for Postgres.
    • table_name (str) – Name of the target table.
    • primary_key (list[str]) – Names of the fields which serve as a primary key in the Postgres table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – “default”: The default initialization mode; “create_if_not_exists”: initializes the SQL writer by creating the necessary table if they do not already exist; “replace”: Initializes the SQL writer by replacing any existing table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider there is a table stats in Pathway, containing the average number of requests to some service or operation per user, over some period of time. The number of requests can be large, so we decide not to store the whole stream of changes, but to only store a snapshot of the data, which can be actualized by Pathway.

The minimum set-up would require us to have a Postgres table with two columns: the ID of the user user_id and the number of requests across some period of time number_of_requests. In order to maintain consistency, we also need two extra columns: time and diff.

The SQL for the creation of such table would look as follows:

CREATE TABLE user_stats (
    user_id TEXT PRIMARY KEY,
    number_of_requests INTEGER,
    time INTEGER NOT NULL,
    diff INTEGER NOT NULL
);

After the table is created, all you need is just to set up the output connector:

import pathway as pw
pw.io.postgres.write_snapshot(  
   stats,
   {
       "host": "localhost",
       "port": "5432",
       "dbname": "database",
       "user": "user",
       "password": "pass",
   },
   "user_stats",
   ["user_id"],
)