pw.io.postgres
pw.io.postgres.write(table, postgres_settings, table_name, max_batch_size=None)
sourceWrites table
’s stream of updates to a postgres table.
In order for write to be successful, it is required that the table contains time
and diff
columns of the integer type.
- Parameters
- postgres_settings (
dict
) – Components for the connection string for Postgres. - table_name (
str
) – Name of the target table. - max_batch_size (
int
|None
) – Maximum number of entries allowed to be committed within a single transaction.
- postgres_settings (
- Returns
None
Example:
Consider there’s a need to output a stream of updates from a table in Pathway to a table in Postgres. Let’s see how this can be done with the connector.
First of all, one needs to provide the required credentials for Postgres
connection string.
While the connection string can include a wide variety of settings, such as SSL
or connection timeouts, in this example we will keep it simple and provide the
smallest example possible. Suppose that the database is running locally on the standard
port 5432, that it has the name database
and is accessible under the username
user
with a password pass
.
It gives us the following content for the connection string:
connection_string_parts = {
"host": "localhost",
"port": "5432",
"dbname": "database",
"user": "user",
"password": "pass",
}
Now let’s load a table, which we will output to the database:
import pathway as pw
t = pw.debug.table_from_markdown("age owner pet \n 1 10 Alice 1 \n 2 9 Bob 1 \n 3 8 Alice 2")
In order to output the table, we will need to create a new table in the database. The table
would need to have all the columns that the output data has. Moreover it will need
integer columns time
and diff
, because these values are an essential part of the
output. Finally, it is also a good idea to create the sequential primary key for
our changes so that we know the updates’ order.
To sum things up, the table creation boils down to the following SQL command:
CREATE TABLE pets (
id SERIAL PRIMARY KEY,
time INTEGER NOT NULL,
diff INTEGER NOT NULL,
age INTEGER,
owner TEXT,
pet TEXT
);
Now, having done all the preparation, one can simply call:
pw.io.postgres.write(
t,
connection_string_parts,
"pets",
)
pw.io.postgres.write_snapshot(table, postgres_settings, table_name, primary_key, max_batch_size=None)
sourceMaintains a snapshot of a table within a Postgres table.
In order for write to be successful, it is required that the table contains time
and diff
columns of the integer type.
- Parameters
- postgres_settings (
dict
) – Components of the connection string for Postgres. - table_name (
str
) – Name of the target table. - primary_key (
list
[str
]) – Names of the fields which serve as a primary key in the Postgres table. - max_batch_size (
int
|None
) – Maximum number of entries allowed to be committed within a single transaction.
- postgres_settings (
- Returns
None
Example:
Consider there is a table stats
in Pathway, containing the average number of requests to some
service or operation per user, over some period of time. The number of requests
can be large, so we decide not to store the whole stream of changes, but to only store
a snapshot of the data, which can be actualized by Pathway.
The minimum set-up would require us to have a Postgres table with two columns: the ID
of the user user_id
and the number of requests across some period of time number_of_requests
.
In order to maintain consistency, we also need two extra columns: time
and diff
.
The SQL for the creation of such table would look as follows:
CREATE TABLE user_stats (
user_id TEXT PRIMARY KEY,
number_of_requests INTEGER,
time INTEGER NOT NULL,
diff INTEGER NOT NULL
);
After the table is created, all you need is just to set up the output connector:
import pathway as pw
pw.io.postgres.write_snapshot(
stats,
{
"host": "localhost",
"port": "5432",
"dbname": "database",
"user": "user",
"password": "pass",
},
"user_stats",
["user_id"],
)