pw.io.clickhouse
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
The table below explains how each Pathway Live Data Framework type is stored in ClickHouse. Optional columns are stored as the Nullable wrapper of the type below, except bytes, which is always stored as Nullable(String). The array-like types (list, tuple and np.ndarray) are supported in specific forms only, described in the table; because ClickHouse arrays cannot be wrapped in Nullable, an optional array-like column is not supported. For the full list of ClickHouse types you can refer to the official documentation.
Pathway Live Data Framework types serialization into ClickHouse
| Framework’s type | ClickHouse type |
|---|---|
bool | Bool |
int | Int64 |
float | Float64 |
pointer | String |
str | String |
bytes | Nullable(String), containing the raw binary data |
Naive DateTime | DateTime64(9, 'UTC'), the UTC timezone is used when passing the value to ClickHouse |
UTC DateTime | DateTime64(9, 'UTC') |
Duration | Int64, serialized and deserialized with nanosecond precision |
JSON | String, containing the serialized JSON value |
pw.PyObjectWrapper | String, containing base64-encoded serialized encoder. This value can be deserialized back, if read by other Pathway Live Data Framework connector |
list[T] | Array(<T>), where T is one of bool, int, float, str, pointer, JSON, Duration or pw.PyObjectWrapper; each element is stored exactly as that scalar type is stored on its own. Datetime elements (list[Naive DateTime] / list[UTC DateTime]) are not supported because the array column would only keep second precision. Only one level of nesting is supported: a list whose element type is itself a list, an optional, or bytes (i.e. list[list[...]], list[T | None], list[bytes]) is not supported either. All of these unsupported cases are rejected when the computation starts |
tuple | a fixed-length tuple whose elements are all the same supported scalar type (one of bool, int, float, str, pointer, JSON, Duration or pw.PyObjectWrapper) — for example tuple[float, float, float] — is stored as Array(<T>) exactly like a list[T] (the tuple length is not preserved, because a ClickHouse Array is variable-length). A variable-length tuple[T, ...] is treated as a list[T]. A heterogeneous tuple (mixed element types, e.g. tuple[int, str]) would require a ClickHouse Tuple, which the connector does not support; it, and tuples of an unsupported element type, are rejected when the computation starts |
np.ndarray | a one-dimensional, element-typed int or float array (e.g. np.ndarray[tuple[int], np.dtype[np.float64]]) is stored as Array(Int64) / Array(Float64), which ClickHouse vector functions such as cosineDistance and L2Distance can operate on. Multi-dimensional arrays, arrays of unknown dimensionality (a bare np.ndarray), and other element types are not supported and are rejected when the computation starts |
write(table, *, connection_string, table_name, output_table_type='stream_of_changes', primary_key=None, init_mode='default', max_batch_size=None, name=None, sort_by=None)
sourceWrites table to a ClickHouse table.
The output table supports two formats, controlled by output_table_type:
a "stream_of_changes" format that appends the full history of updates, and
a "snapshot" format that maintains the current state of the table.
In "stream_of_changes" mode (the default) the output includes every column
of the input table plus two additional columns: time, which holds the
Pathway Live Data Framework minibatch time of the change, and diff, which
describes the type of change (1 for a row insertion and -1 for a row
deletion). A row update is represented as a deletion of the old value followed
by an insertion of the new one, both within the same minibatch. Because
time and diff are reserved column names in this format, the input table
must not contain columns with these names; otherwise a ValueError is raised
at construction time. Each pw.run() invocation appends to the table; use
init_mode="replace" to recreate it from scratch instead.
In "snapshot" mode the connector maintains the current state of the table
rather than its history. ClickHouse is an insert-only, append-oriented store
with no synchronous primary-key upsert or delete, so the current state is
maintained with a ReplacingMergeTree(version, is_deleted)
engine ordered by the primary_key columns, plus two bookkeeping columns
the connector appends to every row:
version— a 64-bit counter that increases by one with every change, in the order changes are produced. It is not the minibatchtimeused in"stream_of_changes"mode; see below for why a separate counter is needed.is_deleted—1if the row is a retraction (deletion),0if it is a live state row. This is exactly thediffof"stream_of_changes"mode re-expressed as a flag (diff = -1becomesis_deleted = 1,diff = 1becomesis_deleted = 0).
Every Pathway change is just appended as a row (no in-place update). What
turns that append-only stream into a live snapshot is how ReplacingMergeTree
behaves at the ClickHouse level: when it merges data parts in the background,
it collapses all rows that share the same ORDER BY key (here the
primary_key) down to the single row with the largest version, and
if that surviving row has is_deleted = 1 it is physically dropped. Both
columns are therefore required, and each does a specific job:
versionmakes “the newest write wins” deterministic, and is the reason this mode needs a dedicated counter rather than reusingtime. An update to a row is emitted as a retraction of the old value followed by an insertion of the new one — both with the same primary key and the same minibatchtime— sotimecannot tell the engine which one is newer.versioncan: the connector assigns each change the next counter value in production order, emitting retractions before insertions within a minibatch, so the insertion always gets the higherversionand the merge keeps the new values. (The counter only ever has to break ties between rows of the same primary key; identicalversionvalues across different keys are irrelevant, sinceReplacingMergeTreecollapses eachORDER BYkey independently.) Reusingtimewould give the retraction and the insertion an equalversion, andReplacingMergeTreebreaks such ties arbitrarily — it could keep the retraction and silently drop the update.is_deletedlets a key be removed at all. Because only inserts are possible, a deletion is represented as an appended row with the highestversionandis_deleted = 1; the merge then removes that key. Without it a deleted key could never leave the table.
Because merges run asynchronously, query the table with
SELECT … FINAL
to observe the current, deduplicated state on demand — FINAL applies the
same keep-largest-version / drop-is_deleted logic at query time. A plain
SELECT (without FINAL) may still return superseded or deleted rows
until a merge has run.
version and is_deleted are reserved column names in this format, so
the input table must not contain columns with these names. This mode requires
the primary_key parameter and runs on a single worker so that the
version counter is globally monotonic; on start-up it is seeded above the
table’s current maximum version so that a restart against an existing
table keeps producing winning rows.
- Parameters
- table (
Table) – The table to write to ClickHouse. - connection_string (
str) – The connection string for the ClickHouse server, in the native-protocol form"tcp://user:password@host:9000/database". Connection options such ascompressionmay be appended as query parameters, e.g."tcp://localhost:9000/default?compression=lz4". - table_name (
str) – The name of the target table in ClickHouse. - output_table_type (
Literal['stream_of_changes','snapshot']) – Either"stream_of_changes"(the default), which appends the full history of changes withtime/diffcolumns, or"snapshot", which maintains the current state of the table in aReplacingMergeTreekeyed byprimary_key(queried withSELECT ... FINAL). - primary_key (
Optional[Iterable[ColumnReference]]) – The columns identifying each row. Required whenoutput_table_type="snapshot"(they become the table’sORDER BY); must not be set otherwise. - init_mode (
Literal['default','create_if_not_exists','replace']) – Determines how the table is initialized before the first write."default"(the default) does nothing and requires the table to already exist."create_if_not_exists"creates the table if it is not present."replace"drops the table if it exists and recreates it. In the two latter modes the table is created with the engine and metadata columns for the chosenoutput_table_type: aMergeTreewithtime/diffcolumns for"stream_of_changes", or aReplacingMergeTree(version, is_deleted)ordered byprimary_keywithversion/is_deletedcolumns for"snapshot". Regardless ofinit_mode, the connector validates the destination table when the computation starts: it must exist and contain every output column (the input columns plus the metadata columns) with a compatible type, and in"snapshot"mode it must use theReplacingMergeTreeengine. A missing table, an absent column, an incompatible column type, or a wrong engine is reported immediately rather than on the first write. ClickHouse inserts perform almost no implicit conversion, so a pre-existing column must have exactly the type the connector produces for it (aStringcolumn may also beFixedString(N), and a datetime column may be anyDateTime/DateTime64variant). - max_batch_size (
int|None) – The maximum number of changes to accumulate before sending an insertion block to the server. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output is sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
- table (
- Returns
None
Example:
The easiest way to run ClickHouse locally is with Docker. You can use the official image and start it like this:
docker pull clickhouse/clickhouse-server
docker run -d --name clickhouse \
-p 8123:8123 -p 9000:9000 \
clickhouse/clickhouse-server
Port 9000 is used for the native protocol, which this connector relies on.
Port 8123 exposes the HTTP interface, which is convenient for inspecting
the data afterwards.
You can now write a simple program. In this example a table with one column
called "data" is created and sent to the database:
import pathway as pw
table = pw.debug.table_from_markdown('''
| data
1 | Hello
2 | World
''')
This table can now be written to ClickHouse. If the output table is called
"test" and you want the connector to create it for you, the code looks
like this:
pw.io.clickhouse.write(
table,
connection_string="tcp://localhost:9000/default",
table_name="test",
init_mode="create_if_not_exists",
)
You can run this pipeline with pw.run().
Once the program has finished, you can inspect the data with any ClickHouse client, for example over the HTTP interface:
curl 'http://localhost:8123/?query=SELECT%20*%20FROM%20test'
Note that if you run the program again it will append data to the table. To
recreate the table from scratch, use init_mode="replace".
If instead of the history of changes you want to maintain the current state of
the table, use output_table_type="snapshot" and provide the column(s) that
identify each row via primary_key:
pets = pw.debug.table_from_markdown('''
| name | owner
1 | Cat | Alice
2 | Dog | Bob
''')
pw.io.clickhouse.write(
pets,
connection_string="tcp://localhost:9000/default",
table_name="pets",
output_table_type="snapshot",
primary_key=[pets.name],
init_mode="create_if_not_exists",
)
Because the deduplication happens during background merges, query the snapshot
with FINAL to always observe the current state:
curl 'http://localhost:8123/?query=SELECT%20name,owner%20FROM%20pets%20FINAL'