pw.io.iceberg
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
All internal Pathway types, except Any, can be stored in Apache Iceberg. The table below explains how Pathway engine data is saved in Iceberg when Pathway creates the destination table. You can also find descriptions of the corresponding Iceberg types in the specification. The values of the corresponding types can also be deserialized from Iceberg storage into Pathway values.
Pathway types conversion into Iceberg
| Pathway type | Iceberg type |
|---|---|
bool | boolean |
int | long (8-byte signed integer number) |
float | double (8-byte double-precision floating-point number) |
pointer | string, can be deserialized back if pw.Pointer type is specified in Pathway table schema |
str | string |
bytes | binary |
Naive DateTime | timestamp or timestamp_ns depending on the chosen timestamp_unit |
UTC DateTime | timestamptz or timestamptz_ns depending on the chosen timestamp_unit. Not supported with the Glue catalog, since the backing Hive metastore has no timezone-aware timestamp type. |
Duration | long, serialized and deserialized with microsecond precision |
JSON | string, containing the serialized JSON value |
np.ndarray | struct with two top-level fields: shape denoting the shape of the stored array, and elements denoting the elements of a flattened array. Same on-disk encoding as pathway.io.deltalake.write(). |
tuple | struct with as many top-level fields as the elements of the tuple. The elements are named [0], [1], and so on, the order of the elements corresponds to the appearance of the types in the tuple |
list | list<T> |
pw.PyObjectWrapper | binary, can be deserialized back if the pw.PyObjectWrapper type is specified in Pathway table schema |
Iceberg map<K, V> is not supported on either read or write — Pathway has no native map type. Columns of this type can’t be included in a Pathway schema.
Reading Iceberg tables produced by other tools
Iceberg tables produced by Spark, Flink, pyiceberg, and similar tools commonly use storage types Pathway never writes itself. pw.io.iceberg.read accepts the following extra Iceberg types and projects them onto Pathway types according to the Pathway type declared in the schema:
Additional Iceberg-to-Pathway conversions (read only)
| Iceberg type | Pathway schema type | Notes |
|---|---|---|
int (32-bit) / float (32-bit) | int / float | Widened to 64-bit; lossless. |
date | Naive DateTime | Pathway has no native Date; values are materialized at midnight on the calendar day. A date outside Pathway’s representable timestamp range (roughly years 1678–2262) surfaces a per-row conversion error instead of an out-of-range value. |
time | Duration | Microseconds since midnight, same convention as the Postgres TIME mapping. |
uuid | str | Canonical 8-4-4-4-12 hex string. |
uuid | bytes | Raw 16 bytes. |
fixed(N) | bytes | Length-N byte string. |
decimal(p, s) | float | Goes through 64-bit float: lossy in general (binary representation, ~15-17 significant decimal digits of mantissa). The reader emits a one-time warning at startup naming each affected column. |
decimal(p, s) | str | Lossless. The unscaled integer is formatted with the column’s scale and passed through as decimal text. Supports the full Iceberg range of up to 38 digits of precision. |
Writing into existing typed columns
When pw.io.iceberg.write is pointed at an Iceberg table that already exists (created by another writer with a hand-rolled schema, or by pyiceberg, Spark, etc.), the writer reconciles each Pathway column type against the destination column’s declared type. If the destination is one of the following narrow or specialized Iceberg types, the writer encodes the value into the existing column’s type:
Pathway-to-existing-Iceberg-column conversions (write only)
| Pathway type | Existing Iceberg column | Notes |
|---|---|---|
int | int (32-bit) | Out-of-range values raise an error rather than silently truncating. |
float | float (32-bit) | Cast to 32-bit float; precision beyond ~7 significant decimal digits is lost. Values outside the 32-bit float range materialize as ±∞ (IEEE 754 semantics). |
str | decimal(p, s) | Each row’s text is parsed as a decimal of the destination’s precision and scale. |
str | uuid | Canonical 8-4-4-4-12 hex. |
bytes | fixed(N) | Length-checked at write time. |
Duration | time | Microseconds since midnight, same convention as pathway.io.postgres.write() for TIME columns. |
Naive DateTime | date | The time-of-day component is silently truncated. Same convention as pathway.io.postgres.write() for DATE columns. |
When the destination column is a struct (bound to a Pathway tuple) or a list<struct<...>>, the same overrides apply recursively to each tuple position or inner element type. Iceberg date is not supported when Pathway is creating the destination table, since Pathway has no date-only type to derive from.
Preflight validation
At construction time, both pw.io.iceberg.read and pw.io.iceberg.write compare the Pathway schema against the Iceberg table’s actual schema and fail fast with a targeted error rather than producing per-row parse errors that don’t surface to pw.run(). The detected mismatches are: a Pathway column not present in the Iceberg table, a Pathway type that has no defined encoding/decoding against the Iceberg column’s type (checked recursively into the inner types of tuple/struct, list, and np.ndarray columns), and a tuple/struct arity mismatch. pw.io.iceberg.write additionally rejects: a required (non-null) Iceberg column the Pathway table doesn’t produce, and a timestamp_unit choice that doesn’t match the destination column’s precision (timestamp vs timestamp_ns).
class GlueCatalog(warehouse, uri=None, catalog_id=None, aws_settings=None)
[source]Configuration settings for a Glue Iceberg catalog.
- Parameters
- warehouse (
str) – The path to the data warehouse. - uri (
str|None) – The URI of the Glue catalog endpoint. - catalog_id (
str|None) – The ID of the Glue catalog. - aws_settings (
AwsS3Settings|None) – The AWS connection settings.
- warehouse (
- Returns
A configuration object.
Example:
Suppose you need to connect to a Glue catalog running in the datalake AWS bucket,
located in the eu-central-1 region.
If the data warehouse path is storage/root, the configuration object can be
constructed as follows:
settings = pw.io.iceberg.GlueCatalog(
warehouse="s3://datalake/storage/root",
aws_settings=pw.io.s3.AwsS3Settings(region="eu-central-1"),
)
If possible, the AWS credentials are inferred from the environment. You can also
specify the credentials explicitly in the AwsS3Settings object.
class RestCatalog(uri, warehouse=None)
[source]Configuration settings for a REST Iceberg catalog.
- Parameters
- uri (
str) – The URI of the catalog. - warehouse (
str|None) – Optional data warehouse path.
- uri (
- Returns
A configuration object.
Example:
Suppose you need to connect to a REST catalog running at
http://localhost:8181. The connection settings can be constructed as follows:
settings = pw.io.iceberg.RestCatalog(uri="http://localhost:8181")
read(catalog, namespace, table_name, schema, *, mode='streaming', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None, **kwargs)
sourceReads a table from Apache Iceberg. In "streaming" mode the connector polls
the catalog for new snapshots and reflects the diff between the previous and
new snapshot’s data files in the Pathway table: files added to the new
plan become row additions, files removed from it become row deletions. Note
that this is a file-level diff — Iceberg V2 row-level delete files (positional
or equality deletes added alongside the same data files) are not separately
tracked.
Note that the connector requires primary key fields to be specified in the schema.
You can specify the fields to be used in the primary key with pw.column_definition
function.
Side effect: if the namespace passed in namespace doesn’t exist in the
catalog at construction time, the connector creates it. The table itself
must already exist — a missing table surfaces as a catalog error.
- Parameters
- catalog (
RestCatalog|GlueCatalog) – Settings for Iceberg catalog connection. - namespace (
list[str]) – The name of the namespace containing the table read. - table_name (
str) – The name of the table to be read. - schema (
type[Schema]) – Schema of the resulting table. - mode (
Literal['streaming','static']) – Denotes how the engine polls the new data from the source. Currently"streaming"and"static"are supported. If set to"streaming"the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the"static"mode will only consider the available data and ingest all of it in one commit. The default value is"streaming". - autocommit_duration_ms (
int|None) – The maximum time between two commits. Everyautocommit_duration_msmilliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes. - debug_data (
Any) – Static data replacing original one when debug mode is active.
- catalog (
- Returns
Table – Table read from the Iceberg source.
Example:
Consider a users data table stored in the Iceberg storage. The table is located in the
app namespace and is named users. The catalog URI is http://localhost:8181.
Below is an example of how to read this table into Pathway.
First, the schema of the table needs to be created. The schema doesn’t have to contain all the columns of the table, you can only specify the ones that are needed for the computation:
import pathway as pw
class InputSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str
Then, this table must be read from the Iceberg storage.
input_table = pw.io.iceberg.read(
catalog=pw.io.iceberg.RestCatalog(uri="http://localhost:8181/"),
namespace=["app"],
table_name="users",
schema=InputSchema,
mode="static",
)
Don’t forget to run your program with pw.run once you define all necessary
computations. Note that you can also change the mode to "streaming" if you want
the changes in the table to be reflected in your computational pipeline.
write(table, catalog, namespace, table_name, *, timestamp_unit='ns', min_commit_frequency=60000, name=None, sort_by=None)
sourceWrites the stream of changes from table into Iceberg
data storage. The data storage must be defined with the catalog, the namespace, and
the table name.
If the namespace or the table doesn’t exist, they will be created by the connector.
The schema of the new table is inferred from the table’s schema.
- Parameters
- table (
Table) – Table to be written. - catalog (
RestCatalog|GlueCatalog) – The catalog of the target storage. - namespace (
list[str]) – The name of the namespace containing the target table. If the namespace doesn’t exist, it will be created by the connector. - table_name (
str) – The name of the table to be written. If a table with such a name doesn’t exist, it will be created by the connector. - timestamp_unit (
Literal['us','ns']) – The precision used for timestamp serialization. It can be either"us"for microseconds or"ns"for nanoseconds. When selecting the precision, ensure that your catalog supports the chosen unit, as different underlying data types are used:timestampfor microsecond precision andtimestamp_nsfor nanosecond precision. Note that some catalogs only support a specific timestamp format. In the case of the Glue catalog, only nanoseconds are supported. - min_commit_frequency (
int|None) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set toNone, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Iceberg creates a new Parquet file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
Consider a users data table stored locally in a file called users.txt in CSV format.
The Iceberg output connector provides the capability to place this table into
Iceberg storage, defined by the REST catalog with URI http://localhost:8181. The target
table is users, located in the app namespace.
First, the table must be read. To do this, you need to define the schema. For simplicity, consider that it consists of two fields: the user ID and the name.
The schema definition may look as follows:
import pathway as pw
class InputSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str
Using this schema, you can read the table from the input file. You need to use the
pw.io.csv.read connector. Here, you can use the static mode since the text file
with the users doesn’t change dynamically.
users = pw.io.csv.read("./users.txt", schema=InputSchema, mode="static")
Once the table is read, you can use pw.io.iceberg.write to save this table into
Iceberg storage.
pw.io.iceberg.write(
users,
catalog=pw.io.iceberg.RestCatalog(uri="http://localhost:8181/"),
namespace=["app"],
table_name="users",
)
Don’t forget to run your program with pw.run once you define all necessary
computations. After execution, you will be able to see the users’ data in the
Iceberg storage.