pw.io.questdb

All internal Pathway types can be saved into QuestDB. The table below explains how the conversion is done. For the full list of QuestDB types you can refer the official documentation.

Pathway types serialization into QuestDB

Pathway typeQuestDB type
boolboolean
intlong
floatdouble
pointerstring
strstring
bytesstring, containing base64-encoded binary data. Please note that the bytes type from QuestDB is currently not supported
Naive DateTimetimestamp, the UTC timezone is used when passing the value to QuestDB
UTC DateTimetimestamp
Durationlong, serialized and deserialized with nanosecond precision
np.ndarraystring containing a JSON object with two top-level fields: an integer array shape containing the shape of the array, and an array elements containing the flattened elements of the array
tuplestring containing a JSON array having the order of the elements corresponding to their appearance in the tuple
liststring containing a JSON array having the order of the elements corresponding to their appearance in the list
pw.PyObjectWrapperstring, containing base64-encoded serialized encoder. This value can be deserialized back, if read by other Pathway connector

write(table, *, connection_string, table_name, designated_timestamp_policy=None, designated_timestamp=None, name=None, sort_by=None)

sourceWrites updates from table to a QuestDB table.

The output includes all columns from the input table, plus two additional columns: time, which contains the minibatch time from Pathway, and diff, which indicates the type of change (1 for row insertion and -1 for row deletion).

By default, the designated timestamp column in QuestDB is set to the current machine time when the row is written.

This behavior can be changed using the designated_timestamp and designated_timestamp_policy parameters. If designated_timestamp is specified, its values will be used as the timestamp. If you set designated_timestamp_policy to use_pathway_time, the Pathway minibatch time will be used as the timestamp. You can also use designated_timestamp_policy="use_now" to be more explicit about using the current machine time.

Note that if you use designated_timestamp_policy="use_pathway_time", the minibatch time will not be added as a separate column; it will only be used as the timestamp. The same applies if you set designated_timestamp - this column is used as the designated timestamp and is not duplicated in the output table.

If the target table does not exist, it will be created when the first write happens. If the table already exists, its schema must match the input data structure. An error will occur if the column types do not match.

  • Parameters
    • table (Table) – The input table to write to QuestDB.
    • connection_string (str) – The client configuration string used to connect to QuestDB.
    • table_name (str) – The name of the target table in QuestDB.
    • designated_timestamp_policy (Optional[Literal['use_now', 'use_pathway_time', 'use_column']]) – Defines how the designated timestamp column is set. The value can be "use_now", which means the current machine time is used as the timestamp. It can also be "use_pathway_time", in which case the Pathway minibatch time is used. Another option is "use_column", which means a specific column will be used as the timestamp; in this case, the designated_timestamp parameter must be provided. If not specified, the default is "use_now".
    • designated_timestamp (ColumnReference | None) – The name of the column that will be used as the designated timestamp column. This column must have either the DateTimeNaive or DateTimeUtc type. If this parameter is set, designated_timestamp_policy can only be set to "use_column", otherwise an error will occur.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

The easiest way to run QuestDB locally is with Docker. You can use the official image and start it like this:

docker pull questdb/questdb
docker run --name questdb -p 8812:8812 -p 9000:9000 questdb/questdb

The first command pulls the QuestDB image from the official repository. The second command starts a container and exposes two ports: 8812 and 9000. Port 8812 is used for connections over the Postgres wire protocol, which will be demonstrated later. Port 9000 is used for the HTTP API, which supports both data ingestion and queries.

You can now write a simple program. In this example, a table with one column called "data" is created and sent to the database:

import pathway as pw
table = pw.debug.table_from_markdown('''
     | data
   1 | Hello
   2 | World
''')

This table can now be written to QuestDB. If the output table is called "test", the Pathway code looks like this:

pw.io.questdb.write(
    table,
    connection_string="http::addr=localhost:9000;",
    table_name="test",
)

The connection string specifies that the HTTP InfluxDB Line Protocol is used for sending data.

Once the code has finished, you can connect to QuestDB using any client that supports the Postgres wire protocol. For example, with psql:

psql -h localhost -p 8812 -U admin -d qdb

The command will prompt for a password. Unless you have changed it, the default password is quest. Once connected, you can run:

qdb=> select * from test;

And see the contents of the table.