pw.io.deltalake

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

All internal Pathway types, except Any, can be stored in Delta Lake. The table below explains how Pathway engine data is saved in Delta Lake. You can also find descriptions of the corresponding Delta types in the protocol description. The values of the corresponding types can also be deserialized from Delta Lake into Pathway values.

Pathway types conversion into Delta Lake

Pathway typeDelta table type
boolboolean
intlong (8-byte signed integer number)
floatdouble (8-byte double-precision floating-point number)
pointerstring, can be deserialized back if pw.Pointer type is specified in Pathway table schema
strstring
bytesbinary
Naive DateTimetimestamp without time zone
UTC DateTimetimestamp
Durationlong, serialized and deserialized with microsecond precision
JSONstring, containing the serialized JSON value
np.ndarraystruct type with two top-level fields: shape denoting the shape of the stored array, and elements denoting the elements of a flattened array
tuplestruct with as many top-level fields as the elements of the tuple. The elements are named [0], [1], and so on, the order of the elements corresponds to the appearance of the types in the tuple
listarray
pw.PyObjectWrapperbinary, can be deserialized back if the pw.PyObjectWrapper type is specified in Pathway table schema

Reading Delta tables produced by other tools

Delta tables produced by Spark, DuckDB, pandas, and similar tools commonly use storage types Pathway never writes itself. pw.io.deltalake.read accepts the following extra Delta types and projects them onto Pathway types according to the Pathway type declared in the schema:

Additional Delta-to-Pathway conversions (read only)

Delta table typePathway schema typeNotes
byte / short / integer (1/2/4-byte signed)intWidened to i64; lossless.
unsigned 1/2/4/8-byte integers (Parquet UINT_8UINT_64)intWidened to i64; values exceeding i64::MAX (only possible for UINT_64) raise a conversion error.
float (4-byte) / float16 (2-byte)floatWidened to f64; lossless.
dateNaive DateTime or UTC DateTimePathway has no native Date; values are materialized at midnight on the calendar day.
timestamp with millisecond precisionNaive DateTime or UTC DateTimeSub-second precision preserved to the millisecond.
decimal(p, s)floatGoes through f64: lossy in general (binary representation, ~15-17 significant decimal digits of mantissa). The reader emits a one-time warning at startup naming each affected column.
decimal(p, s)strLossless. The unscaled integer is formatted with the column’s scale and passed through as decimal text (decimal(8, 2) value 10050"100.50"). Supports the full Delta range of up to 38 digits of precision.

Partition columns of all types in the first table round-trip through pw.io.deltalake.read, including pointer, Duration, and JSON partitions, which are stored as their underlying string or long Delta types in the partition path.

Writing to existing decimal columns

When the destination Delta table already has a decimal(p, s) column and the Pathway column written into it has type str, the writer parses each row’s decimal text into the unscaled integer and stores it as a fixed-point value of the column’s declared precision and scale. This is the symmetric counterpart to reading a Delta decimal column as str: a Delta decimal column read into Pathway, processed as text, and written back into the same Delta column round-trips with no precision loss.

A string that can’t be parsed as a decimal of the column’s shape (non-digit characters, more fractional digits than the column’s scale, or more total digits than its precision) fails the write with an error message naming the offending value, the column’s precision and scale, and the specific constraint it violated. Tables that don’t contain a decimal column are unaffected: writing a Pathway str column into a fresh table or into an existing Delta string column behaves exactly as before.

class TableOptimizer(tracked_column, time_format, quick_access_window, compression_frequency, retention_period=datetime.timedelta(0), remove_old_checkpoints=False)

[source]

The table optimizer is used to optimize partitioned Delta tables created by the output connector. This optimization is limited to tables that are partitioned by a string column, where the values represent date and time in a specific format.

After a WRITE operation and once the specified interval has passed, the optimizer runs OPTIMIZE and VACUUM operations. If these operations fail, they will be retried during the next write. Keep in mind that running OPTIMIZE and VACUUM may cause a delay in the output because they take time to complete, but they do not slow down the overall computational pipeline. This approach is necessary to prevent conflicts that could occur from simultaneous writes to the Delta log.

If remove_old_checkpoints is enabled, the background process will also make sure that only the most recent checkpoint file is kept.

Please note that remove_old_checkpoints is currently an experimental feature and it works only when the backend uses a filesystem.

  • Parameters
    • tracked_column (ColumnReference) – The partition column for the observed table.
    • time_format (str) – A strftime-like format string that defines how values in the tracked column are interpreted.
    • quick_access_window (timedelta) – All partition values older than this window will be compressed using the OPTIMIZE Delta Lake operation, followed by VACUUM.
    • compression_frequency (timedelta) – Determines how often the compression process is triggered. If a compression attempt fails, it will be retried immediately without waiting.
    • retention_period (timedelta) – Retention period for the VACUUM operation.
    • remove_old_checkpoints (bool) – If True, Pathway will keep only the most recent checkpoint file to reduce storage usage.

Example:

Suppose you are writing to a table that is partitioned by the column day_utc, where the values follow the ISO-8601 format: YYYY-MM-DD. You want to compress data older than 7 days and run this compression once per day.

In that case, the optimizer settings would be configured as follows:

import pathway as pw
optimizer = pw.io.deltalake.TableOptimizer(  
    tracked_column=table.day_utc,
    time_forma2t="%Y-%m-%d",
    quick_access_window=datetime.timedelta(days=7),
    compression_frequency=datetime.timedelta(days=1),
)

This optimizer object needs to be passed to the pw.io.deltalake.write function.

Note: Background cleanup of old snapshots will not run automatically in this setup. To enable it, you can turn it on explicitly:

optimizer = pw.io.deltalake.TableOptimizer(  
    tracked_column=table.day_utc,
    time_format="%Y-%m-%d",
    quick_access_window=datetime.timedelta(days=7),
    compression_frequency=datetime.timedelta(days=1),
    remove_old_checkpoints=True,
)

read(uri, schema=None, *, mode='streaming', s3_connection_settings=None, start_from_timestamp_ms=None, autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None, **kwargs)

sourceReads a table from Delta Lake. Currently, local and S3 lakes are supported. The table doesn’t have to be append only, however, the deletion vectors are not supported yet.

Reads are atomic with respect to the data version. This means that if a data update - such as inserting, deleting, or modifying rows - is made within a single Delta transaction, all those changes will be applied together, as one atomic operation, in a single minibatch.

Note that the connector requires either the table to be append-only or the primary key fields to be specified in the schema. You can define the primary key fields using the pw.column_definition function.

  • Parameters
    • uri (str | PathLike) – URI of the Delta Lake source that must be read.
    • schema (type[Schema] | None) – Defines the schema of the resulting table. You can omit this parameter if the table is created using pw.io.deltalake.write, as the schema (excluding the special time and diff fields) will then be automatically stored in the Delta Table’s columns metadata.
    • mode (Literal['streaming', 'static']) – Denotes how the engine polls the new data from the source. Currently "streaming" and "static" are supported. If set to "streaming" the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the "static" mode will only consider the available data and ingest all of it in one commit. The default value is "streaming".
    • s3_connection_settings (AwsS3Settings | MinIOSettings | WasabiS3Settings | DigitalOceanS3Settings | None) – Configuration for S3 credentials when using S3 storage. In addition to the access key and secret access key, you can specify a custom endpoint, which is necessary for buckets hosted outside of Amazon AWS. If the custom endpoint is left blank, the authorized user’s credentials for S3 will be used.
    • start_from_timestamp_ms (int | None) – If defined, only changes that occurred after the specified timestamp are read. When used with non-append-only tables, the state of the table at the given timestamp is loaded first, and then all updates are read incrementally.
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
    • max_backlog_size (int | None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes.
    • debug_data (Any) – Static data replacing original one when debug mode is active.

Examples:

Consider an example with a stream of changes on a simple key-value table, streamed by another Pathway program with pw.io.deltalake.write method.

Let’s start writing Pathway code. First, the schema of the table needs to be created:

import pathway as pw
class KVSchema(pw.Schema):
    key: str = pw.column_definition(primary_key=True)
    value: str

Then, this table must be written into a Delta Lake storage. In the example, it can be created from the static data with pw.debug.table_from_markdown method and saved into the locally located lake:

output_table = pw.debug.table_from_markdown("key value \n one Hello \n two World")
lake_path = "./local-lake"
pw.io.deltalake.write(output_table, lake_path)

Now the producer code can be run with a simple pw.run:

pw.run(monitoring_level=pw.MonitoringLevel.NONE)

After that, you can read this table with Pathway as well. It requires the specification of the URI and the schema that was created above. In addition, you can use the "static" mode, so that the program finishes after the data is read:

input_table = pw.io.deltalake.read(lake_path, KVSchema, mode="static")

Please note that the table doesn’t necessary have to be created by Pathway: an append-only Delta Table created in any other way will also be processed correctly.

Finally, you can check that the resulting table contains the same set of rows by displaying it with pw.debug.compute_and_print:

pw.debug.compute_and_print(input_table, include_id=False)

Please note that you can use the same communication approach if S3 is used as a data storage. To do this, specify an S3 path starting with s3:// or s3a://, and provide the credentials object as a parameter. If no credentials are provided but the path starts with s3:// or s3a://, Pathway will use the credentials of the currently authenticated user.

Reading Delta decimal(p, s) columns. Pathway has no native Decimal type, so a Delta decimal column has to be projected onto something else, and the choice is driven by the Pathway type declared in the schema:

  • Declaring the column as float converts each value through f64. This is lossy in general — both because f64 is binary (e.g. 0.1 is not exact) and because its mantissa carries only ~15-17 significant decimal digits. The reader emits a one-time warning at startup naming each affected column.
  • Declaring the column as str formats the unscaled integer with the column’s scale and passes the resulting decimal text through unchanged (e.g. decimal(8, 2) value 10050 becomes "100.50"). This is lossless for the full Delta precision range (up to 38 digits).

The symmetric write path, described in pw.io.deltalake.write, lets you preserve a Delta decimal(p, s) column type across a Pathway pipeline: read it as str, process it as text, and write it back into the same Delta column.

write(table, uri, *, s3_connection_settings=None, partition_columns=None, min_commit_frequency=60000, name=None, sort_by=None, output_table_type='stream_of_changes', table_optimizer=None)

sourceWrites the stream of changes from table into Delta Lake https://delta.io/_ data storage at the location specified by uri. Supported storage types are S3 and the local filesystem.

The storage type is determined by the URI: paths starting with s3:// or s3a:// are for S3 storage, while all other paths use the filesystem.

If the specified storage location doesn’t exist, it will be created. The schema of the new table is inferred from the table’s schema. Additionally, when the connector creates a table, its Pathway schema is stored in the column metadata. This allows the table to be read using pw.io.deltalake.read without explicitly specifying a schema.

  • Parameters
    • table (Table) – Table to be written.
    • uri (str | PathLike) – URI of the target Delta Lake.
    • s3_connection_settings (AwsS3Settings | MinIOSettings | WasabiS3Settings | DigitalOceanS3Settings | None) – Configuration for S3 credentials when using S3 storage. In addition to the access key and secret access key, you can specify a custom endpoint, which is necessary for buckets hosted outside of Amazon AWS. If the custom endpoint is left blank, the authorized user’s credentials for S3 will be used.
    • partition_columns (Optional[Iterable[ColumnReference]]) – Partition columns for the table. Used if the table is created by Pathway.
    • min_commit_frequency (int | None) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set to None, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Delta Lake creates a new file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table. Note that to further optimize performance and reduce the number of chunks in the table, you can use vacuum or optimize operations afterwards.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – Defines how the output table manages its data. If set to "stream_of_changes" (the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion). If set to "snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible. To correctly track the relationship between the Pathway’s primary key and the output table in this mode, an additional _id field of the Pointer type is added. Please note that this mode may be slower when there are many deletions, because a deletion in a minibatch causes the entire table to be rewritten once that minibatch reaches the output. Please also note that this method is not suitable for the tables that don’t fit in memory.
    • table_optimizer (TableOptimizer | None) – The optimization parameters for the output table.
  • Returns
    None

Example:

Consider a table access_log that needs to be output to a Delta Lake storage located locally at the folder ./logs/access-log. It can be done as follows:

pw.io.deltalake.write(access_log, "./logs/access-log")

Please note that if there is no filesystem object at this path, the corresponding folder will be created. However, if you run this code twice, the new data will be appended to the storage created during the first run.

It is also possible to save the table to S3 storage. To save the table to the access-log path within the logs bucket in the eu-west-3 region, modify the code as follows:

pw.io.deltalake.write(  
    access_log,
    "s3://logs/access-log/",
    s3_connection_settings=pw.io.s3.AwsS3Settings(
        bucket_name="logs",
        region="eu-west-3",
        access_key=os.environ["S3_ACCESS_KEY"],
        secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
    )
)

Note that it is not necessary to specify the credentials explicitly if you are logged into S3. Pathway can deduce them for you. For an authorized user, the code can be simplified as follows:

pw.io.deltalake.write(access_log, "s3://logs/access-log/")

Writing to existing Delta decimal(p, s) columns. When the destination Delta table already has a decimal(p, s) column and the Pathway column written into it has type str, the writer parses each row’s decimal text into the underlying unscaled integer and stores it as a fixed-point value of the column’s declared precision and scale — no f64 detour, no precision loss. This is the symmetric counterpart to reading a Delta decimal column as str: a Delta decimal column read into Pathway, processed as text, and written back into the same Delta column round-trips with no loss.

A string that can’t be parsed as a decimal of the column’s shape (non-digit characters, more fractional digits than the column’s scale, or more total digits than its precision) fails the write with an error message naming the offending value, the column’s precision and scale, and the specific constraint it violated. Tables that don’t contain a decimal column are unaffected: writing a Pathway str column into a fresh table or into an existing Delta string column behaves exactly as before.