pw.io.duckdb

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

The Pathway Live Data Framework provides an Output connector for DuckDB. DuckDB is an in-process analytical database, so the connector needs no external service: data is written natively into a DuckDB database file. See the write documentation below for the output modes (stream_of_changes / snapshot), the init_mode options, and the primary_key rules. The type conversion is given in the table below.

Type Conversion (Output Connector)

The table below describes how Pathway Live Data Framework values are written into DuckDB columns. List, array, tuple and pw.Json values are bound as a JSON string and cast back into the destination type inside the generated SQL, because the underlying DuckDB driver cannot bind a list value as a statement parameter directly; this is transparent to the user.

Framework types written by the output connector

Framework typeDuckDB typeEncoding / notes
boolBOOLEANAs-is.
intBIGINTAs-is.
floatDOUBLEAs-is.
strVARCHARUTF-8 text.
bytesBLOBRaw bytes.
pw.DateTimeNaiveTIMESTAMPMicrosecond resolution.
pw.DateTimeUtcTIMESTAMPStored as the UTC wall-clock value (microsecond resolution), independent of any session time zone.
pw.DurationINTERVALMicrosecond resolution.
pw.JsonJSONA JSON document.
pw.PointerVARCHARPathway Live Data Framework’s base32 pointer encoding, e.g. ^Z5QKEQ….
list[T] / tuple of one element typeT[]A native DuckDB list, e.g. list[float]DOUBLE[]. Searchable with list_cosine_similarity & friends.
np.ndarray (int / float elements)BIGINT[] / DOUBLE[]A native DuckDB list; multi-dimensional arrays become nested lists (DOUBLE[][] …) following the array shape.
heterogeneous tupleJSONA JSON array — tuples whose elements have different types have no natural list column type.
pw.PyObjectWrapperBLOBA bincode payload.
None (in optional columns)NULLNon-optional columns are created NOT NULL.

write(table, *, table_name, database, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)

sourceWrites table into a table of a DuckDB database file. DuckDB is an in-process analytical database, so this connector needs no external service: data is written natively into a database file. Two output table types are supported, selected with output_table_type.

With "stream_of_changes" (the default) the output table contains a log of every change seen in the Pathway table. Two extra columns, time (BIGINT) and diff (SMALLINT), are appended: time is the minibatch timestamp of the change and diff is 1 for an insertion or -1 for a deletion. A row modification is therefore expressed as a deletion (diff = -1) followed by an insertion (diff = 1) within the same minibatch. A schema column named time or diff collides with these metadata columns and is rejected at write() time.

With "snapshot" the output table is kept in sync with the current state of the Pathway table: +1 events are applied as INSERT ... ON CONFLICT (primary_key) DO UPDATE and -1 events as DELETE ... WHERE primary_key = ?, so no time / diff columns are written. primary_key is required in this mode (and forbidden otherwise) and must list non-nullable columns of table: a NULL key makes DELETE ... WHERE primary_key = NULL never match on retractions, leaving stale rows behind. The destination table must carry a PRIMARY KEY / UNIQUE constraint on those columns for the upsert to target; init_mode="create_if_not_exists" / "replace" create it for you.

init_mode decides what state the destination table should be in before the first write. "default" requires the table to already exist with matching columns (plus time / diff in stream-of-changes mode); "create_if_not_exists" creates it if missing, with columns derived from the Pathway table (and a PRIMARY KEY on primary_key in snapshot mode); "replace" drops and recreates it. Once the connection is opened the destination table is validated and a clear error is raised — instead of an opaque mid-write failure — if it is a view, is missing (under "default"), lacks a column the writer needs (including time / diff in stream-of-changes mode), has an extra NOT NULL column without a default, or has a NOT NULL column that an Optional Pathway column maps onto. Pathway column names that differ only in case are rejected at write() time, since DuckDB compares identifiers case-insensitively.

Column types are mapped onto their natural DuckDB equivalents as described in the type-conversion table above. In particular, columns holding embeddings — whether typed as numpy arrays or as list[float] — are stored as DuckDB DOUBLE[] lists, which can be queried directly with DuckDB’s vector-distance functions (list_cosine_similarity, list_distance, list_inner_product); this makes the resulting table usable as a vector store for retrieval in a RAG pipeline.

DuckDB is an embedded, single-file database — a file can be opened read-write by only one process at a time. All writes for a single pw.io.duckdb.write run on one worker even when Pathway runs with several workers; DuckDB serializes writes to a file regardless, so this costs no throughput while keeping the output complete and deterministic.

  • Parameters
    • table (Table) – The table to write.
    • table_name (str) – The name of the target table in the DuckDB database.
    • database (PathLike | str) – Path to the DuckDB database file. The file is created if it does not exist. ":memory:" opens a private in-process database (only useful within a single run). If the path resolves to an existing directory, a ValueError is raised at call time.
    • max_batch_size (int | None) – Optional upper bound on the number of rows buffered between flushes. Each batch is committed inside a single DuckDB transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – How the destination table is initialized before the first write, one of "default", "create_if_not_exists" or "replace" (see above).
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – How the output table represents the data, either "stream_of_changes" (the default) or "snapshot" (see above).
    • primary_key (list[ColumnReference] | None) – One or more columns of table that form the primary key in the destination table. Required for snapshot mode and forbidden otherwise.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Examples:

Consider a stream of changes appended to a DuckDB file. A schema may mix arbitrary column types — here an integer, a float, a string and a boolean — and the connector adds the time / diff columns automatically:

import pathway as pw
class MeasurementSchema(pw.Schema):
    sensor: str
    temperature: float
    readings: int
    active: bool
measurements = pw.debug.table_from_rows(
    MeasurementSchema,
    [("sensor-a", 21.5, 100, True), ("sensor-b", 19.0, 80, False)],
)
pw.io.duckdb.write(
    measurements,
    table_name="measurements",
    database="./metrics.duckdb",
    init_mode="create_if_not_exists",
)  
pw.run()

Alternatively, the destination table can be kept in sync with the current state of the Pathway table by using the snapshot output type. A primary_key is required, and — unlike the stream of changes — no time / diff columns are written:

class AccountSchema(pw.Schema):
    account_id: int
    balance: float
accounts = pw.debug.table_from_rows(
    AccountSchema,
    [(1, 100.0), (2, 250.0)],
)
pw.io.duckdb.write(
    accounts,
    table_name="accounts",
    database="./bank.duckdb",
    output_table_type="snapshot",
    primary_key=[accounts.account_id],
    init_mode="create_if_not_exists",
)  
pw.run()

The resulting table holds only the data columns — no time / diff:

SELECT * FROM accounts;
+------------+---------+
| account_id | balance |
|   int64    | double  |
+------------+---------+
|          1 |   100.0 |
|          2 |   250.0 |
+------------+---------+

As a final example, a table of document chunks together with their embeddings is persisted into a DuckDB file for retrieval (RAG). list[float] columns are stored as native DOUBLE[] lists:

class DocSchema(pw.Schema):
    text: str
    embedding: list[float]
docs = pw.debug.table_from_rows(
    DocSchema,
    [("a cat sat on a mat", [1.0, 0.0, 0.0]),
     ("a dog in the yard", [0.0, 1.0, 0.0])],
)
pw.io.duckdb.write(
    docs,
    table_name="documents",
    database="./docs.duckdb",
    init_mode="create_if_not_exists",
)  
pw.run()

Afterwards the embeddings can be searched with plain DuckDB SQL:

SELECT text, list_cosine_similarity(embedding, [1.0, 0.0, 0.0]) AS score
FROM documents
WHERE diff = 1
ORDER BY score DESC
LIMIT 5;