pw.io.duckdb
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
The Pathway Live Data Framework provides an Output connector for
DuckDB. DuckDB is an in-process analytical database, so
the connector needs no external service: data is written natively into a DuckDB
database file. See the write documentation below for the output modes
(stream_of_changes / snapshot), the init_mode options, and the
primary_key rules. The type conversion is given in the table below.
Type Conversion (Output Connector)
The table below describes how Pathway Live Data Framework values are written into
DuckDB columns. List, array, tuple and pw.Json values are bound as a JSON
string and cast back into the destination type inside the generated SQL, because
the underlying DuckDB driver cannot bind a list value as a statement parameter
directly; this is transparent to the user.
Framework types written by the output connector
| Framework type | DuckDB type | Encoding / notes |
|---|---|---|
bool | BOOLEAN | As-is. |
int | BIGINT | As-is. |
float | DOUBLE | As-is. |
str | VARCHAR | UTF-8 text. |
bytes | BLOB | Raw bytes. |
pw.DateTimeNaive | TIMESTAMP | Microsecond resolution. |
pw.DateTimeUtc | TIMESTAMP | Stored as the UTC wall-clock value (microsecond resolution), independent of any session time zone. |
pw.Duration | INTERVAL | Microsecond resolution. |
pw.Json | JSON | A JSON document. |
pw.Pointer | VARCHAR | Pathway Live Data Framework’s base32 pointer encoding, e.g. ^Z5QKEQ…. |
list[T] / tuple of one element type | T[] | A native DuckDB list, e.g. list[float] → DOUBLE[]. Searchable with list_cosine_similarity & friends. |
np.ndarray (int / float elements) | BIGINT[] / DOUBLE[] | A native DuckDB list; multi-dimensional arrays become nested lists (DOUBLE[][] …) following the array shape. |
heterogeneous tuple | JSON | A JSON array — tuples whose elements have different types have no natural list column type. |
pw.PyObjectWrapper | BLOB | A bincode payload. |
None (in optional columns) | NULL | Non-optional columns are created NOT NULL. |
write(table, *, table_name, database, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)
sourceWrites table into a table of a DuckDB database
file. DuckDB is an in-process analytical database, so this connector needs no
external service: data is written natively into a database file. Two output
table types are supported, selected with output_table_type.
With "stream_of_changes" (the default) the output table contains a log of
every change seen in the Pathway table. Two extra columns, time (BIGINT)
and diff (SMALLINT), are appended: time is the minibatch timestamp
of the change and diff is 1 for an insertion or -1 for a deletion. A
row modification is therefore expressed as a deletion (diff = -1) followed
by an insertion (diff = 1) within the same minibatch. A schema column named
time or diff collides with these metadata columns and is rejected at
write() time.
With "snapshot" the output table is kept in sync with the current state of
the Pathway table: +1 events are applied as
INSERT ... ON CONFLICT (primary_key) DO UPDATE and -1 events as
DELETE ... WHERE primary_key = ?, so no time / diff columns are
written. primary_key is required in this mode (and forbidden otherwise) and
must list non-nullable columns of table: a NULL key makes
DELETE ... WHERE primary_key = NULL never match on retractions, leaving
stale rows behind. The destination table must carry a PRIMARY KEY /
UNIQUE constraint on those columns for the upsert to target;
init_mode="create_if_not_exists" / "replace" create it for you.
init_mode decides what state the destination table should be in before the
first write. "default" requires the table to already exist with matching
columns (plus time / diff in stream-of-changes mode);
"create_if_not_exists" creates it if missing, with columns derived from the
Pathway table (and a PRIMARY KEY on primary_key in snapshot mode);
"replace" drops and recreates it. Once the connection is opened the
destination table is validated and a clear error is raised — instead of an
opaque mid-write failure — if it is a view, is missing (under "default"),
lacks a column the writer needs (including time / diff in
stream-of-changes mode), has an extra NOT NULL column without a default, or
has a NOT NULL column that an Optional Pathway column maps onto. Pathway
column names that differ only in case are rejected at write() time, since
DuckDB compares identifiers case-insensitively.
Column types are mapped onto their natural DuckDB equivalents as described in
the type-conversion table above. In particular, columns holding embeddings —
whether typed as numpy arrays or as list[float] — are stored as DuckDB
DOUBLE[] lists, which can be queried directly with DuckDB’s vector-distance
functions (list_cosine_similarity, list_distance,
list_inner_product); this makes the resulting table usable as a vector store
for retrieval in a RAG pipeline.
DuckDB is an embedded, single-file database — a file can be opened read-write by
only one process at a time. All writes for a single pw.io.duckdb.write run
on one worker even when Pathway runs with several workers; DuckDB serializes
writes to a file regardless, so this costs no throughput while keeping the
output complete and deterministic.
- Parameters
- table (
Table) – The table to write. - table_name (
str) – The name of the target table in the DuckDB database. - database (
PathLike|str) – Path to the DuckDB database file. The file is created if it does not exist.":memory:"opens a private in-process database (only useful within a single run). If the path resolves to an existing directory, aValueErroris raised at call time. - max_batch_size (
int|None) – Optional upper bound on the number of rows buffered between flushes. Each batch is committed inside a single DuckDB transaction. - init_mode (
Literal['default','create_if_not_exists','replace']) – How the destination table is initialized before the first write, one of"default","create_if_not_exists"or"replace"(see above). - output_table_type (
Literal['stream_of_changes','snapshot']) – How the output table represents the data, either"stream_of_changes"(the default) or"snapshot"(see above). - primary_key (
list[ColumnReference] |None) – One or more columns oftablethat form the primary key in the destination table. Required for snapshot mode and forbidden otherwise. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Examples:
Consider a stream of changes appended to a DuckDB file. A schema may mix
arbitrary column types — here an integer, a float, a string and a boolean —
and the connector adds the time / diff columns automatically:
import pathway as pw
class MeasurementSchema(pw.Schema):
sensor: str
temperature: float
readings: int
active: bool
measurements = pw.debug.table_from_rows(
MeasurementSchema,
[("sensor-a", 21.5, 100, True), ("sensor-b", 19.0, 80, False)],
)
pw.io.duckdb.write(
measurements,
table_name="measurements",
database="./metrics.duckdb",
init_mode="create_if_not_exists",
)
pw.run()
Alternatively, the destination table can be kept in sync with the current
state of the Pathway table by using the snapshot output type. A
primary_key is required, and — unlike the stream of changes — no time
/ diff columns are written:
class AccountSchema(pw.Schema):
account_id: int
balance: float
accounts = pw.debug.table_from_rows(
AccountSchema,
[(1, 100.0), (2, 250.0)],
)
pw.io.duckdb.write(
accounts,
table_name="accounts",
database="./bank.duckdb",
output_table_type="snapshot",
primary_key=[accounts.account_id],
init_mode="create_if_not_exists",
)
pw.run()
The resulting table holds only the data columns — no time / diff:
SELECT * FROM accounts;
+------------+---------+
| account_id | balance |
| int64 | double |
+------------+---------+
| 1 | 100.0 |
| 2 | 250.0 |
+------------+---------+
As a final example, a table of document chunks together with their embeddings
is persisted into a DuckDB file for retrieval (RAG). list[float] columns
are stored as native DOUBLE[] lists:
class DocSchema(pw.Schema):
text: str
embedding: list[float]
docs = pw.debug.table_from_rows(
DocSchema,
[("a cat sat on a mat", [1.0, 0.0, 0.0]),
("a dog in the yard", [0.0, 1.0, 0.0])],
)
pw.io.duckdb.write(
docs,
table_name="documents",
database="./docs.duckdb",
init_mode="create_if_not_exists",
)
pw.run()
Afterwards the embeddings can be searched with plain DuckDB SQL:
SELECT text, list_cosine_similarity(embedding, [1.0, 0.0, 0.0]) AS score
FROM documents
WHERE diff = 1
ORDER BY score DESC
LIMIT 5;