pw.io.iceberg

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

All internal Pathway types, except Any, can be stored in Apache Iceberg. The table below explains how Pathway engine data is saved in Iceberg when Pathway creates the destination table. You can also find descriptions of the corresponding Iceberg types in the specification. The values of the corresponding types can also be deserialized from Iceberg storage into Pathway values.

Pathway types conversion into Iceberg

Pathway typeIceberg type
boolboolean
intlong (8-byte signed integer number)
floatdouble (8-byte double-precision floating-point number)
pointerstring, can be deserialized back if pw.Pointer type is specified in Pathway table schema
strstring
bytesbinary
Naive DateTimetimestamp or timestamp_ns depending on the chosen timestamp_unit
UTC DateTimetimestamptz or timestamptz_ns depending on the chosen timestamp_unit. Not supported with the Glue catalog, since the backing Hive metastore has no timezone-aware timestamp type.
Durationlong, serialized and deserialized with microsecond precision
JSONstring, containing the serialized JSON value
np.ndarraystruct with two top-level fields: shape denoting the shape of the stored array, and elements denoting the elements of a flattened array. Same on-disk encoding as pathway.io.deltalake.write().
tuplestruct with as many top-level fields as the elements of the tuple. The elements are named [0], [1], and so on, the order of the elements corresponds to the appearance of the types in the tuple
listlist<T>
pw.PyObjectWrapperbinary, can be deserialized back if the pw.PyObjectWrapper type is specified in Pathway table schema

Iceberg map<K, V> is not supported on either read or write — Pathway has no native map type. Columns of this type can’t be included in a Pathway schema.

Reading Iceberg tables produced by other tools

Iceberg tables produced by Spark, Flink, pyiceberg, and similar tools commonly use storage types Pathway never writes itself. pw.io.iceberg.read accepts the following extra Iceberg types and projects them onto Pathway types according to the Pathway type declared in the schema:

Additional Iceberg-to-Pathway conversions (read only)

Iceberg typePathway schema typeNotes
int (32-bit) / float (32-bit)int / floatWidened to 64-bit; lossless.
dateNaive DateTimePathway has no native Date; values are materialized at midnight on the calendar day. A date outside Pathway’s representable timestamp range (roughly years 1678–2262) surfaces a per-row conversion error instead of an out-of-range value.
timeDurationMicroseconds since midnight, same convention as the Postgres TIME mapping.
uuidstrCanonical 8-4-4-4-12 hex string.
uuidbytesRaw 16 bytes.
fixed(N)bytesLength-N byte string.
decimal(p, s)floatGoes through 64-bit float: lossy in general (binary representation, ~15-17 significant decimal digits of mantissa). The reader emits a one-time warning at startup naming each affected column.
decimal(p, s)strLossless. The unscaled integer is formatted with the column’s scale and passed through as decimal text. Supports the full Iceberg range of up to 38 digits of precision.

Writing into existing typed columns

When pw.io.iceberg.write is pointed at an Iceberg table that already exists (created by another writer with a hand-rolled schema, or by pyiceberg, Spark, etc.), the writer reconciles each Pathway column type against the destination column’s declared type. If the destination is one of the following narrow or specialized Iceberg types, the writer encodes the value into the existing column’s type:

Pathway-to-existing-Iceberg-column conversions (write only)

Pathway typeExisting Iceberg columnNotes
intint (32-bit)Out-of-range values raise an error rather than silently truncating.
floatfloat (32-bit)Cast to 32-bit float; precision beyond ~7 significant decimal digits is lost. Values outside the 32-bit float range materialize as ±∞ (IEEE 754 semantics).
strdecimal(p, s)Each row’s text is parsed as a decimal of the destination’s precision and scale.
struuidCanonical 8-4-4-4-12 hex.
bytesfixed(N)Length-checked at write time.
DurationtimeMicroseconds since midnight, same convention as pathway.io.postgres.write() for TIME columns.
Naive DateTimedateThe time-of-day component is silently truncated. Same convention as pathway.io.postgres.write() for DATE columns.

When the destination column is a struct (bound to a Pathway tuple) or a list<struct<...>>, the same overrides apply recursively to each tuple position or inner element type. Iceberg date is not supported when Pathway is creating the destination table, since Pathway has no date-only type to derive from.

Preflight validation

At construction time, both pw.io.iceberg.read and pw.io.iceberg.write compare the Pathway schema against the Iceberg table’s actual schema and fail fast with a targeted error rather than producing per-row parse errors that don’t surface to pw.run(). The detected mismatches are: a Pathway column not present in the Iceberg table, a Pathway type that has no defined encoding/decoding against the Iceberg column’s type (checked recursively into the inner types of tuple/struct, list, and np.ndarray columns), and a tuple/struct arity mismatch. pw.io.iceberg.write additionally rejects: a required (non-null) Iceberg column the Pathway table doesn’t produce, and a timestamp_unit choice that doesn’t match the destination column’s precision (timestamp vs timestamp_ns).

class GlueCatalog(warehouse, uri=None, catalog_id=None, aws_settings=None)

[source]

Configuration settings for a Glue Iceberg catalog.

  • Parameters
    • warehouse (str) – The path to the data warehouse.
    • uri (str | None) – The URI of the Glue catalog endpoint.
    • catalog_id (str | None) – The ID of the Glue catalog.
    • aws_settings (AwsS3Settings | None) – The AWS connection settings.
  • Returns
    A configuration object.

Example:

Suppose you need to connect to a Glue catalog running in the datalake AWS bucket, located in the eu-central-1 region.

If the data warehouse path is storage/root, the configuration object can be constructed as follows:

settings = pw.io.iceberg.GlueCatalog(
    warehouse="s3://datalake/storage/root",
    aws_settings=pw.io.s3.AwsS3Settings(region="eu-central-1"),
)

If possible, the AWS credentials are inferred from the environment. You can also specify the credentials explicitly in the AwsS3Settings object.

class RestCatalog(uri, warehouse=None)

[source]

Configuration settings for a REST Iceberg catalog.

  • Parameters
    • uri (str) – The URI of the catalog.
    • warehouse (str | None) – Optional data warehouse path.
  • Returns
    A configuration object.

Example:

Suppose you need to connect to a REST catalog running at http://localhost:8181. The connection settings can be constructed as follows:

settings = pw.io.iceberg.RestCatalog(uri="http://localhost:8181")

read(catalog, namespace, table_name, schema, *, mode='streaming', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None, **kwargs)

sourceReads a table from Apache Iceberg. In "streaming" mode the connector polls the catalog for new snapshots and reflects the diff between the previous and new snapshot’s data files in the Pathway table: files added to the new plan become row additions, files removed from it become row deletions. Note that this is a file-level diff — Iceberg V2 row-level delete files (positional or equality deletes added alongside the same data files) are not separately tracked.

Note that the connector requires primary key fields to be specified in the schema. You can specify the fields to be used in the primary key with pw.column_definition function.

Side effect: if the namespace passed in namespace doesn’t exist in the catalog at construction time, the connector creates it. The table itself must already exist — a missing table surfaces as a catalog error.

  • Parameters
    • catalog (RestCatalog | GlueCatalog) – Settings for Iceberg catalog connection.
    • namespace (list[str]) – The name of the namespace containing the table read.
    • table_name (str) – The name of the table to be read.
    • schema (type[Schema]) – Schema of the resulting table.
    • mode (Literal['streaming', 'static']) – Denotes how the engine polls the new data from the source. Currently "streaming" and "static" are supported. If set to "streaming" the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the "static" mode will only consider the available data and ingest all of it in one commit. The default value is "streaming".
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
    • max_backlog_size (int | None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – Table read from the Iceberg source.

Example:

Consider a users data table stored in the Iceberg storage. The table is located in the app namespace and is named users. The catalog URI is http://localhost:8181. Below is an example of how to read this table into Pathway.

First, the schema of the table needs to be created. The schema doesn’t have to contain all the columns of the table, you can only specify the ones that are needed for the computation:

import pathway as pw
class InputSchema(pw.Schema):
    user_id: int = pw.column_definition(primary_key=True)
    name: str

Then, this table must be read from the Iceberg storage.

input_table = pw.io.iceberg.read(
    catalog=pw.io.iceberg.RestCatalog(uri="http://localhost:8181/"),
    namespace=["app"],
    table_name="users",
    schema=InputSchema,
    mode="static",
)

Don’t forget to run your program with pw.run once you define all necessary computations. Note that you can also change the mode to "streaming" if you want the changes in the table to be reflected in your computational pipeline.

write(table, catalog, namespace, table_name, *, timestamp_unit='ns', min_commit_frequency=60000, name=None, sort_by=None)

sourceWrites the stream of changes from table into Iceberg data storage. The data storage must be defined with the catalog, the namespace, and the table name.

If the namespace or the table doesn’t exist, they will be created by the connector. The schema of the new table is inferred from the table’s schema.

  • Parameters
    • table (Table) – Table to be written.
    • catalog (RestCatalog | GlueCatalog) – The catalog of the target storage.
    • namespace (list[str]) – The name of the namespace containing the target table. If the namespace doesn’t exist, it will be created by the connector.
    • table_name (str) – The name of the table to be written. If a table with such a name doesn’t exist, it will be created by the connector.
    • timestamp_unit (Literal['us', 'ns']) – The precision used for timestamp serialization. It can be either "us" for microseconds or "ns" for nanoseconds. When selecting the precision, ensure that your catalog supports the chosen unit, as different underlying data types are used: timestamp for microsecond precision and timestamp_ns for nanosecond precision. Note that some catalogs only support a specific timestamp format. In the case of the Glue catalog, only nanoseconds are supported.
    • min_commit_frequency (int | None) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set to None, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Iceberg creates a new Parquet file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider a users data table stored locally in a file called users.txt in CSV format. The Iceberg output connector provides the capability to place this table into Iceberg storage, defined by the REST catalog with URI http://localhost:8181. The target table is users, located in the app namespace.

First, the table must be read. To do this, you need to define the schema. For simplicity, consider that it consists of two fields: the user ID and the name.

The schema definition may look as follows:

import pathway as pw
class InputSchema(pw.Schema):
    user_id: int = pw.column_definition(primary_key=True)
    name: str

Using this schema, you can read the table from the input file. You need to use the pw.io.csv.read connector. Here, you can use the static mode since the text file with the users doesn’t change dynamically.

users = pw.io.csv.read("./users.txt", schema=InputSchema, mode="static")

Once the table is read, you can use pw.io.iceberg.write to save this table into Iceberg storage.

pw.io.iceberg.write(
    users,
    catalog=pw.io.iceberg.RestCatalog(uri="http://localhost:8181/"),
    namespace=["app"],
    table_name="users",
)

Don’t forget to run your program with pw.run once you define all necessary computations. After execution, you will be able to see the users’ data in the Iceberg storage.