pw.io.iceberg
pw.io.iceberg.write(table, catalog_uri, namespace, table_name, *, warehouse=None, min_commit_frequency=60000)
sourceWrites the stream of changes from table
into Iceberg
data storage. The data storage must be defined with the REST catalog URI, the namespace,
and the table name.
If the namespace or the table doesn’t exist, they will be created by the connector.
The schema of the new table is inferred from the table
’s schema. The output table
must include two additional integer columns: time
, representing the computation
minibatch, and diff
, indicating the type of change (1
for row addition and
-1
for row deletion).
- Parameters
- table (
Table
) – Table to be written. - catalog_uri (
str
) – URI of the Iceberg REST catalog. - namespace (
list
[str
]) – The name of the namespace containing the target table. If the namespace doesn’t exist, it will be created by the connector. - table_name (
str
) – The name of the table to be written. If a table with such a name doesn’t exist, it will be created by the connector. - warehouse (
str
|None
) – Optional, path to the Iceberg storage warehouse. - min_commit_frequency (
int
|None
) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set toNone
, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Iceberg creates a new Parquet file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table.
- table (
- Returns
None
Example:
Consider a users data table stored locally in a file called users.txt
in CSV format.
The Iceberg output connector provides the capability to place this table into
Iceberg storage, defined by the catalog with URI http://localhost:8181
. The target
table is users
, located in the app
namespace.
First, the table must be read. To do this, you need to define the schema. For simplicity, consider that it consists of two fields: the user ID and the name.
The schema definition may look as follows:
import pathway as pw
class InputSchema(pw.Schema):
user_id: int
name: str
Using this schema, you can read the table from the input file. You need to use the
pw.io.csv.read
connector. Here, you can use the static mode since the text file
with the users doesn’t change dynamically.
users = pw.io.csv.read("./users.txt", schema=InputSchema, mode="static")
Once the table is read, you can use pw.io.iceberg.write
to save this table into
Iceberg storage.
pw.io.iceberg.write(
users,
catalog_uri="http://localhost:8181/",
namespace=["app"],
table_name="users",
)
Don’t forget to run your program with pw.run
once you define all necessary
computations. After execution, you will be able to see the users’ data in the
Iceberg storage.