pw.io.iceberg

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

All primitive Pathway types can be stored in Apache Iceberg. The table below explains how Pathway engine data is saved in Iceberg. You can also find descriptions of the corresponding Iceberg types in the specification. The values of the corresponding types can also be deserialized from Iceberg storage into Pathway values.

Pathway types conversion into Iceberg

Pathway typeIceberg type
boolboolean
intlong (8-byte signed integer number)
floatdouble (8-byte double-precision floating-point number)
pointerstr, can be deserialized back if pw.Pointer type is specified in Pathway table schema
strstring
bytesbinary
Naive DateTimetimestamp
UTC DateTimetimestamptz
Durationlong, serialized and deserialized with microsecond precision
np.ndarrayCurrently not supported
tupleCurrently not supported
listCurrently not supported
pw.PyObjectWrapperbinary, can be deserialized back if the pw.PyObjectWrapper type is specified in Pathway table schema

read(catalog_uri, namespace, table_name, schema, *, mode='streaming', warehouse=None, autocommit_duration_ms=1500, name=None, debug_data=None, **kwargs)

sourceReads a table from Apache Iceberg. If ran in a streaming mode, the connector tracks new row additions and old row deletions and reflects them in the table read.

Note that the connector requires primary key fields to be specified in the schema. You can specify the fields to be used in the primary key with pw.column_definition function.

  • Parameters
    • catalog_uri (str) – URI of the Iceberg REST catalog.
    • namespace (list[str]) – The name of the namespace containing the table read.
    • table_name (str) – The name of the table to be read.
    • schema (type[Schema]) – Schema of the resulting table.
    • mode (str) – Denotes how the engine polls the new data from the source. Currently "streaming" and "static" are supported. If set to "streaming" the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the "static" mode will only consider the available data and ingest all of it in one commit. The default value is "streaming".
    • warehouse (str | None) – Optional, path to the Iceberg storage warehouse.
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – Table read from the Iceberg source.

Example:

Consider a users data table stored in the Iceberg storage. The table is located in the app namespace and is named users. The catalog URI is http://localhost:8181. Below is an example of how to read this table into Pathway.

First, the schema of the table needs to be created. The schema doesn’t have to contain all the columns of the table, you can only specify the ones that are needed for the computation:

import pathway as pw
class InputSchema(pw.Schema):
    user_id: int = pw.column_definition(primary_key=True)
    name: str

Then, this table must be read from the Iceberg storage.

input_table = pw.io.iceberg.read(
    catalog_uri="http://localhost:8181/",
    namespace=["app"],
    table_name="users",
    schema=InputSchema,
    mode="static",
)

Don’t forget to run your program with pw.run once you define all necessary computations. Note that you can also change the mode to "streaming" if you want the changes in the table to be reflected in your computational pipeline.

write(table, catalog_uri, namespace, table_name, *, warehouse=None, min_commit_frequency=60000, name=None)

sourceWrites the stream of changes from table into Iceberg data storage. The data storage must be defined with the REST catalog URI, the namespace, and the table name.

If the namespace or the table doesn’t exist, they will be created by the connector. The schema of the new table is inferred from the table’s schema. The output table must include two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion).

  • Parameters
    • table (Table) – Table to be written.
    • catalog_uri (str) – URI of the Iceberg REST catalog.
    • namespace (list[str]) – The name of the namespace containing the target table. If the namespace doesn’t exist, it will be created by the connector.
    • table_name (str) – The name of the table to be written. If a table with such a name doesn’t exist, it will be created by the connector.
    • warehouse (str | None) – Optional, path to the Iceberg storage warehouse.
    • min_commit_frequency (int | None) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set to None, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Iceberg creates a new Parquet file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
  • Returns
    None

Example:

Consider a users data table stored locally in a file called users.txt in CSV format. The Iceberg output connector provides the capability to place this table into Iceberg storage, defined by the catalog with URI http://localhost:8181. The target table is users, located in the app namespace.

First, the table must be read. To do this, you need to define the schema. For simplicity, consider that it consists of two fields: the user ID and the name.

The schema definition may look as follows:

import pathway as pw
class InputSchema(pw.Schema):
    user_id: int = pw.column_definition(primary_key=True)
    name: str

Using this schema, you can read the table from the input file. You need to use the pw.io.csv.read connector. Here, you can use the static mode since the text file with the users doesn’t change dynamically.

users = pw.io.csv.read("./users.txt", schema=InputSchema, mode="static")

Once the table is read, you can use pw.io.iceberg.write to save this table into Iceberg storage.

pw.io.iceberg.write(
    users,
    catalog_uri="http://localhost:8181/",
    namespace=["app"],
    table_name="users",
)

Don’t forget to run your program with pw.run once you define all necessary computations. After execution, you will be able to see the users’ data in the Iceberg storage.