pw.io.clickhouse

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

The table below explains how each Pathway Live Data Framework type is stored in ClickHouse. Optional columns are stored as the Nullable wrapper of the type below, except bytes, which is always stored as Nullable(String). The array-like types (list, tuple and np.ndarray) are supported in specific forms only, described in the table; because ClickHouse arrays cannot be wrapped in Nullable, an optional array-like column is not supported. For the full list of ClickHouse types you can refer to the official documentation.

Pathway Live Data Framework types serialization into ClickHouse

Framework’s typeClickHouse type
boolBool
intInt64
floatFloat64
pointerString
strString
bytesNullable(String), containing the raw binary data
Naive DateTimeDateTime64(9, 'UTC'), the UTC timezone is used when passing the value to ClickHouse
UTC DateTimeDateTime64(9, 'UTC')
DurationInt64, serialized and deserialized with nanosecond precision
JSONString, containing the serialized JSON value
pw.PyObjectWrapperString, containing base64-encoded serialized encoder. This value can be deserialized back, if read by other Pathway Live Data Framework connector
list[T]Array(<T>), where T is one of bool, int, float, str, pointer, JSON, Duration or pw.PyObjectWrapper; each element is stored exactly as that scalar type is stored on its own. Datetime elements (list[Naive DateTime] / list[UTC DateTime]) are not supported because the array column would only keep second precision. Only one level of nesting is supported: a list whose element type is itself a list, an optional, or bytes (i.e. list[list[...]], list[T | None], list[bytes]) is not supported either. All of these unsupported cases are rejected when the computation starts
tuplea fixed-length tuple whose elements are all the same supported scalar type (one of bool, int, float, str, pointer, JSON, Duration or pw.PyObjectWrapper) — for example tuple[float, float, float] — is stored as Array(<T>) exactly like a list[T] (the tuple length is not preserved, because a ClickHouse Array is variable-length). A variable-length tuple[T, ...] is treated as a list[T]. A heterogeneous tuple (mixed element types, e.g. tuple[int, str]) would require a ClickHouse Tuple, which the connector does not support; it, and tuples of an unsupported element type, are rejected when the computation starts
np.ndarraya one-dimensional, element-typed int or float array (e.g. np.ndarray[tuple[int], np.dtype[np.float64]]) is stored as Array(Int64) / Array(Float64), which ClickHouse vector functions such as cosineDistance and L2Distance can operate on. Multi-dimensional arrays, arrays of unknown dimensionality (a bare np.ndarray), and other element types are not supported and are rejected when the computation starts

write(table, *, connection_string, table_name, output_table_type='stream_of_changes', primary_key=None, init_mode='default', max_batch_size=None, name=None, sort_by=None)

sourceWrites table to a ClickHouse table.

The output table supports two formats, controlled by output_table_type: a "stream_of_changes" format that appends the full history of updates, and a "snapshot" format that maintains the current state of the table.

In "stream_of_changes" mode (the default) the output includes every column of the input table plus two additional columns: time, which holds the Pathway Live Data Framework minibatch time of the change, and diff, which describes the type of change (1 for a row insertion and -1 for a row deletion). A row update is represented as a deletion of the old value followed by an insertion of the new one, both within the same minibatch. Because time and diff are reserved column names in this format, the input table must not contain columns with these names; otherwise a ValueError is raised at construction time. Each pw.run() invocation appends to the table; use init_mode="replace" to recreate it from scratch instead.

In "snapshot" mode the connector maintains the current state of the table rather than its history. ClickHouse is an insert-only, append-oriented store with no synchronous primary-key upsert or delete, so the current state is maintained with a ReplacingMergeTree(version, is_deleted) engine ordered by the primary_key columns, plus two bookkeeping columns the connector appends to every row:

  • version — a 64-bit counter that increases by one with every change, in the order changes are produced. It is not the minibatch time used in "stream_of_changes" mode; see below for why a separate counter is needed.
  • is_deleted1 if the row is a retraction (deletion), 0 if it is a live state row. This is exactly the diff of "stream_of_changes" mode re-expressed as a flag (diff = -1 becomes is_deleted = 1, diff = 1 becomes is_deleted = 0).

Every Pathway change is just appended as a row (no in-place update). What turns that append-only stream into a live snapshot is how ReplacingMergeTree behaves at the ClickHouse level: when it merges data parts in the background, it collapses all rows that share the same ORDER BY key (here the primary_key) down to the single row with the largest version, and if that surviving row has is_deleted = 1 it is physically dropped. Both columns are therefore required, and each does a specific job:

  • version makes “the newest write wins” deterministic, and is the reason this mode needs a dedicated counter rather than reusing time. An update to a row is emitted as a retraction of the old value followed by an insertion of the new one — both with the same primary key and the same minibatch time — so time cannot tell the engine which one is newer. version can: the connector assigns each change the next counter value in production order, emitting retractions before insertions within a minibatch, so the insertion always gets the higher version and the merge keeps the new values. (The counter only ever has to break ties between rows of the same primary key; identical version values across different keys are irrelevant, since ReplacingMergeTree collapses each ORDER BY key independently.) Reusing time would give the retraction and the insertion an equal version, and ReplacingMergeTree breaks such ties arbitrarily — it could keep the retraction and silently drop the update.
  • is_deleted lets a key be removed at all. Because only inserts are possible, a deletion is represented as an appended row with the highest version and is_deleted = 1; the merge then removes that key. Without it a deleted key could never leave the table.

Because merges run asynchronously, query the table with SELECT … FINAL to observe the current, deduplicated state on demand — FINAL applies the same keep-largest-version / drop-is_deleted logic at query time. A plain SELECT (without FINAL) may still return superseded or deleted rows until a merge has run.

version and is_deleted are reserved column names in this format, so the input table must not contain columns with these names. This mode requires the primary_key parameter and runs on a single worker so that the version counter is globally monotonic; on start-up it is seeded above the table’s current maximum version so that a restart against an existing table keeps producing winning rows.

  • Parameters
    • table (Table) – The table to write to ClickHouse.
    • connection_string (str) – The connection string for the ClickHouse server, in the native-protocol form "tcp://user:password@host:9000/database". Connection options such as compression may be appended as query parameters, e.g. "tcp://localhost:9000/default?compression=lz4".
    • table_name (str) – The name of the target table in ClickHouse.
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – Either "stream_of_changes" (the default), which appends the full history of changes with time/diff columns, or "snapshot", which maintains the current state of the table in a ReplacingMergeTree keyed by primary_key (queried with SELECT ... FINAL).
    • primary_key (Optional[Iterable[ColumnReference]]) – The columns identifying each row. Required when output_table_type="snapshot" (they become the table’s ORDER BY); must not be set otherwise.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – Determines how the table is initialized before the first write. "default" (the default) does nothing and requires the table to already exist. "create_if_not_exists" creates the table if it is not present. "replace" drops the table if it exists and recreates it. In the two latter modes the table is created with the engine and metadata columns for the chosen output_table_type: a MergeTree with time/diff columns for "stream_of_changes", or a ReplacingMergeTree(version, is_deleted) ordered by primary_key with version/is_deleted columns for "snapshot". Regardless of init_mode, the connector validates the destination table when the computation starts: it must exist and contain every output column (the input columns plus the metadata columns) with a compatible type, and in "snapshot" mode it must use the ReplacingMergeTree engine. A missing table, an absent column, an incompatible column type, or a wrong engine is reported immediately rather than on the first write. ClickHouse inserts perform almost no implicit conversion, so a pre-existing column must have exactly the type the connector produces for it (a String column may also be FixedString(N), and a datetime column may be any DateTime/DateTime64 variant).
    • max_batch_size (int | None) – The maximum number of changes to accumulate before sending an insertion block to the server.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output is sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
  • Returns
    None

Example:

The easiest way to run ClickHouse locally is with Docker. You can use the official image and start it like this:

docker pull clickhouse/clickhouse-server
docker run -d --name clickhouse \
    -p 8123:8123 -p 9000:9000 \
    clickhouse/clickhouse-server

Port 9000 is used for the native protocol, which this connector relies on. Port 8123 exposes the HTTP interface, which is convenient for inspecting the data afterwards.

You can now write a simple program. In this example a table with one column called "data" is created and sent to the database:

import pathway as pw
table = pw.debug.table_from_markdown('''
     | data
   1 | Hello
   2 | World
''')

This table can now be written to ClickHouse. If the output table is called "test" and you want the connector to create it for you, the code looks like this:

pw.io.clickhouse.write(
    table,
    connection_string="tcp://localhost:9000/default",
    table_name="test",
    init_mode="create_if_not_exists",
)

You can run this pipeline with pw.run().

Once the program has finished, you can inspect the data with any ClickHouse client, for example over the HTTP interface:

curl 'http://localhost:8123/?query=SELECT%20*%20FROM%20test'

Note that if you run the program again it will append data to the table. To recreate the table from scratch, use init_mode="replace".

If instead of the history of changes you want to maintain the current state of the table, use output_table_type="snapshot" and provide the column(s) that identify each row via primary_key:

pets = pw.debug.table_from_markdown('''
     | name  | owner
   1 | Cat   | Alice
   2 | Dog   | Bob
''')
pw.io.clickhouse.write(
    pets,
    connection_string="tcp://localhost:9000/default",
    table_name="pets",
    output_table_type="snapshot",
    primary_key=[pets.name],
    init_mode="create_if_not_exists",
)

Because the deduplication happens during background merges, query the snapshot with FINAL to always observe the current state:

curl 'http://localhost:8123/?query=SELECT%20name,owner%20FROM%20pets%20FINAL'