pw.io.questdb
All internal Pathway types can be saved into QuestDB. The table below explains how the conversion is done. For the full list of QuestDB types you can refer the official documentation.
Pathway types serialization into QuestDB
Pathway type | QuestDB type |
---|---|
bool | boolean |
int | long |
float | double |
pointer | string |
str | string |
bytes | string , containing base64-encoded binary data. Please note that the bytes type from QuestDB is currently not supported |
Naive DateTime | timestamp , the UTC timezone is used when passing the value to QuestDB |
UTC DateTime | timestamp |
Duration | long , serialized and deserialized with nanosecond precision |
np.ndarray | string containing a JSON object with two top-level fields: an integer array shape containing the shape of the array, and an array elements containing the flattened elements of the array |
tuple | string containing a JSON array having the order of the elements corresponding to their appearance in the tuple |
list | string containing a JSON array having the order of the elements corresponding to their appearance in the list |
pw.PyObjectWrapper | string , containing base64-encoded serialized encoder. This value can be deserialized back, if read by other Pathway connector |
write(table, *, connection_string, table_name, designated_timestamp_policy=None, designated_timestamp=None, name=None, sort_by=None)
sourceWrites updates from table
to a QuestDB table.
The output includes all columns from the input table, plus two additional columns:
time
, which contains the minibatch time from Pathway, and diff
, which indicates
the type of change (1
for row insertion and -1
for row deletion).
By default, the designated timestamp column in QuestDB is set to the current machine time when the row is written.
This behavior can be changed using the designated_timestamp
and designated_timestamp_policy
parameters. If designated_timestamp
is specified, its values will be used as the timestamp.
If you set designated_timestamp_policy
to use_pathway_time
, the Pathway minibatch time
will be used as the timestamp. You can also use designated_timestamp_policy="use_now"
to be more explicit about using the current machine time.
Note that if you use designated_timestamp_policy="use_pathway_time"
, the minibatch time will
not be added as a separate column; it will only be used as the timestamp. The same
applies if you set designated_timestamp
- this column is used as the designated timestamp
and is not duplicated in the output table.
If the target table does not exist, it will be created when the first write happens. If the table already exists, its schema must match the input data structure. An error will occur if the column types do not match.
- Parameters
- table (
Table
) – The input table to write to QuestDB. - connection_string (
str
) – The client configuration string used to connect to QuestDB. - table_name (
str
) – The name of the target table in QuestDB. - designated_timestamp_policy (
Optional
[Literal
['use_now'
,'use_pathway_time'
,'use_column'
]]) – Defines how the designated timestamp column is set. The value can be"use_now"
, which means the current machine time is used as the timestamp. It can also be"use_pathway_time"
, in which case the Pathway minibatch time is used. Another option is"use_column"
, which means a specific column will be used as the timestamp; in this case, thedesignated_timestamp
parameter must be provided. If not specified, the default is"use_now"
. - designated_timestamp (
ColumnReference
|None
) – The name of the column that will be used as the designated timestamp column. This column must have either theDateTimeNaive
orDateTimeUtc
type. If this parameter is set,designated_timestamp_policy
can only be set to"use_column"
, otherwise an error will occur. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
The easiest way to run QuestDB locally is with Docker. You can use the official image and start it like this:
docker pull questdb/questdb
docker run --name questdb -p 8812:8812 -p 9000:9000 questdb/questdb
The first command pulls the QuestDB image from the official repository. The second
command starts a container and exposes two ports: 8812
and 9000
. Port 8812
is used for connections over the Postgres wire protocol, which will be demonstrated later.
Port 9000
is used for the HTTP API, which supports both data ingestion and queries.
You can now write a simple program. In this example, a table with one column called
"data"
is created and sent to the database:
import pathway as pw
table = pw.debug.table_from_markdown('''
| data
1 | Hello
2 | World
''')
This table can now be written to QuestDB. If the output table is called "test"
,
the Pathway code looks like this:
pw.io.questdb.write(
table,
connection_string="http::addr=localhost:9000;",
table_name="test",
)
The connection string specifies that the HTTP InfluxDB Line Protocol is used for sending data.
Once the code has finished, you can connect to QuestDB using any client that supports
the Postgres wire protocol. For example, with psql
:
psql -h localhost -p 8812 -U admin -d qdb
The command will prompt for a password. Unless you have changed it, the default password is
quest
. Once connected, you can run:
qdb=> select * from test;
And see the contents of the table.