pw.io.deltalake

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

All internal Pathway types, except Any, can be stored in Delta Lake. The table below explains how Pathway engine data is saved in Delta Lake. You can also find descriptions of the corresponding Delta types in the protocol description. The values of the corresponding types can also be deserialized from Delta Lake into Pathway values.

Pathway types conversion into Delta Lake

Pathway typeDelta table type
boolboolean
intlong (8-byte signed integer number)
floatdouble (8-byte double-precision floating-point number)
pointerbinary, can be deserialized back if pw.Pointer type is specified in Pathway table schema
strstring
bytesbinary
Naive DateTimetimestamp without time zone
UTC DateTimetimestamp
Durationlong, serialized and deserialized with microsecond precision
np.ndarraystruct type with two top-level fields: shape denoting the shape of the stored array, and elements denoting the elements of a flattened array
tuplestruct with as many top-level fields as the elements of the tuple. The elements are named [0], [1], and so on, the order of the elements corresponds to the appearance of the types in the tuple
listarray
pw.PyObjectWrapperbinary, can be deserialized back if the pw.PyObjectWrapper type is specified in Pathway table schema

read(uri, schema, *, mode='streaming', s3_connection_settings=None, autocommit_duration_ms=1500, persistent_id=None, debug_data=None)

sourceReads a table from Delta Lake. Currently, local and S3 lakes are supported. The table doesn’t have to be append only, however, the deletion vectors are not supported yet.

Note that the connector requires primary key fields to be specified in the schema. You can specify the fields to be used in the primary key with pw.column_definition function.

  • Parameters
    • uri (str | PathLike) – URI of the Delta Lake source that must be read.
    • schema (type[Schema]) – Schema of the resulting table.
    • mode (str) – Denotes how the engine polls the new data from the source. Currently "streaming" and "static" are supported. If set to "streaming" the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the "static" mode will only consider the available data and ingest all of it in one commit. The default value is "streaming".
    • s3_connection_settings (AwsS3Settings | MinIOSettings | WasabiS3Settings | DigitalOceanS3Settings | None) – Configuration for S3 credentials when using S3 storage. In addition to the access key and secret access key, you can specify a custom endpoint, which is necessary for buckets hosted outside of Amazon AWS. If the custom endpoint is left blank, the authorized user’s credentials for S3 will be used.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • debug_data (Any) – Static data replacing original one when debug mode is active.

Examples:

Consider an example with a stream of changes on a simple key-value table, streamed by another Pathway program with pw.io.deltalake.write method.

Let’s start writing Pathway code. First, the schema of the table needs to be created:

import pathway as pw
class KVSchema(pw.Schema):
    key: str = pw.column_definition(primary_key=True)
    value: str

Then, this table must be written into a Delta Lake storage. In the example, it can be created from the static data with pw.debug.table_from_markdown method and saved into the locally located lake:

output_table = pw.debug.table_from_markdown("key value \n one Hello \n two World")
lake_path = "./local-lake"
pw.io.deltalake.write(output_table, lake_path)

Now the producer code can be run with with a simple pw.run:

pw.run(monitoring_level=pw.MonitoringLevel.NONE)

After that, you can read this table with Pathway as well. It requires the specification of the URI and the schema that was created above. In addition, you can use the "static" mode, so that the program finishes after the data is read:

input_table = pw.io.deltalake.read(lake_path, KVSchema, mode="static")

Please note that the table doesn’t necessary have to be created by Pathway: an append-only Delta Table created in any other way will also be processed correctly.

Finally, you can check that the resulting table contains the same set of rows by displaying it with pw.debug.compute_and_print:

pw.debug.compute_and_print(input_table, include_id=False)

Please note that you can use the same communication approach if S3 is used as a data storage. To do this, specify an S3 path starting with s3:// or s3a://, and provide the credentials object as a parameter. If no credentials are provided but the path starts with s3:// or s3a://, Pathway will use the credentials of the currently authenticated user.

write(table, uri, *, s3_connection_settings=None, min_commit_frequency=60000)

sourceWrites the stream of changes from table into Delta Lake https://delta.io/_ data storage at the location specified by uri. Supported storage types are S3 and the local filesystem.

The storage type is determined by the URI: paths starting with s3:// or s3a:// are for S3 storage, while all other paths use the filesystem.

If the specified storage location doesn’t exist, it will be created. The schema of the new table is inferred from the table’s schema. The output table must include two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion).

  • Parameters
    • table (Table) – Table to be written.
    • uri (str | PathLike) – URI of the target Delta Lake.
    • s3_connection_settings (AwsS3Settings | MinIOSettings | WasabiS3Settings | DigitalOceanS3Settings | None) – Configuration for S3 credentials when using S3 storage. In addition to the access key and secret access key, you can specify a custom endpoint, which is necessary for buckets hosted outside of Amazon AWS. If the custom endpoint is left blank, the authorized user’s credentials for S3 will be used.
    • min_commit_frequency (int | None) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set to None, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Delta Lake creates a new file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table. Note that to further optimize performance and reduce the number of chunks in the table, you can use vacuum or optimize operations afterwards.
  • Returns
    None

Example:

Consider a table access_log that needs to be output to a Delta Lake storage located locally at the folder ./logs/access-log. It can be done as follows:

pw.io.deltalake.write(access_log, "./logs/access-log")

Please note that if there is no filesystem object at this path, the corresponding folder will be created. However, if you run this code twice, the new data will be appended to the storage created during the first run.

It is also possible to save the table to S3 storage. To save the table to the access-log path within the logs bucket in the eu-west-3 region, modify the code as follows:

pw.io.deltalake.write(  
    access_log,
    "s3://logs/access-log/",
    s3_connection_settings=pw.io.s3.AwsS3Settings(
        bucket_name="logs",
        region="eu-west-3",
        access_key=os.environ["S3_ACCESS_KEY"],
        secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
    )
)

Note that it is not necessary to specify the credentials explicitly if you are logged into S3. Pathway can deduce them for you. For an authorized user, the code can be simplified as follows:

pw.io.deltalake.write(access_log, "s3://logs/access-log/")