Persistence in Pathway

The pathway framework helps to build computational pipelines easily and flexibly. Under the course of development, one may probably need to save the state of the computation. Here is where persistence comes to help.

For example, persistence may be needed for one or several of the following reasons:

  • To preserve the state of the computation, so that the program, if being restarted, continues to work from the place it resumed;
  • To be able to recover from a failure without recomputing the whole data pipeline from scratch.

Example

Suppose that we're solving a simple wordcount task. The input consists of a directory of CSV files, which needs to be polled constantly. Each file consists of a header and several lines, each containing a word.

For the output there should be a file containing a sequence of JSON lines, each containing fields "word" and "count", denoting the word and the number of its' occurrences.

Having that given, the problem can be solved with the following small Pathway program:

import pathway as pw


class InputSchema(pw.Schema):
    word: str


words = pw.io.csv.read("inputs/", schema=InputSchema)
word_counts = words.groupby(words.word).reduce(words.word, count=pw.reducers.count())
pw.io.jsonlines.write(word_counts, "result.jsonlines")
pw.run()

This program is not persistent. That means that when being restarted, it will start the computation from the beginning, scanning the files one-by-one, processing the word counts, and outputting them.

However, it can easily be made into persistent one.

Then only essential block that Pathway needs is the persistent storage configuration. Consider that we store the intermediary state in the local directory called state. Then, we obtain the following persistent config:

persistence_backend = pw.persistence.Backend.filesystem("./state/")
persistence_config = pw.persistence.Config.simple_config(persistence_backend)

Now, the persistence config needs to be passed into the pw.run method. It can be done as follows:

pw.run(persistence_config=persistence_config)

With these modifications the program becomes persistent. That means, if being restarted it will continue the computations and the output from the place it stopped last time. Now let's understand the main concepts for persistence setup in more detail.

Please note that there is also a tutorial with step-by-step explanation available.

Main Concepts

Persistent Storage

First, Pathway needs storage, which can persist the internal state, along with the checkpoint and metainformation. The persistence mechanism combines internal state serialization with a write-ahead logging mechanism. It can store the data either locally or in S3.

The storage can be configured with the usage of pw.persistence.Config class instance, which should be passed as an argument to pw.run command, executing the data pipeline. The persistence config accumulates several settings:

  • metadata storage, denoting the storage to be used for the metadata: the times advanced, current positions in the data sources, description of the computational graph. The size of metadata doesn't depend on the volume of data being processed;
  • snapshot storage, denoting the storage to be used for data snapshot. The data snapshot is a bigger structure, with the size depending on the volume of data;
  • snapshot interval, denoting the desired freshness of maintained snapshot. This interval is a tradeoff between borrowing too many resources from the computation and having not so actual snapshot. It denotes how close can be the new updates to the last dumped state in order for the engine to be able to postpone storing them.

Both metadata and snapshot storages are configurable with the class pw.persistence.Backend, that has two methods for S3 and Filesystem storage configuration.

The config itself is created via constructor, where it accepts keyword arguments metadata_storage and snapshot_storage, both of the type pw.persistence.Backend, and optionally the snapshot interval given in milliseconds.

Please note that if the same backend is used for them, there's a convenience method simple_config that allows persistent storage config creation with just a single backend.

This, however, is not the only thing needed for persistence to work, and it moves us to the persistent ids.

Persistent IDs

To persist certain input sources, Pathway requires them to have the persistent_id parameter. This identifier denotes the factual data source, so it is also expected not to be changed over the reruns of the program.

The motivation behind the persistent id is that the data source can be changed as much as it's needed for the computations as long as it contains the data with the same schema. For example, it is possible that:

  • a data format in the source is changed. For instance, JSON was used, however, the newer entries are in CSV;
  • a path to the data source is changed. For instance, the logs that are parsed by Pathway are now stored on a different volume;
  • some fields are renamed in the data source. As an example, it is possible that the field "date" was renamed to "datetime" to have a more accurate name.

All in all, the variety of changes can be huge. Still, by using the same persistent_id the engine knows that the data still corresponds to a certain table.

These IDs can be assigned in two different ways. The first one is automatic generation. It will assign the IDs to the sources based on the order in which they are added. For example, if the program first reads a dataset from the CSV and then it reads the stream of events from Kafka, it will have two persistent IDs automatically generated, where the first one will point to the dataset and the second one will point to the stream of events. Please note that this approach is fine in case the code is not going to be updated. Otherwise, the sources can be changed which will incur the generation of different persistent IDs, not corresponding to the old ones.

The second way can be a bit more difficult, but it allows more flexibility: the persistent IDs can be assigned manually in the input connector. To do so, one needs to specify the string parameter persistent_id. For example, the word count program above relied on the automatic persistent_id generation. However, it would also be possible to specify it explicitly this way:

words = pw.io.csv.read("inputs/", schema=InputSchema, persistent_id="words_source")

Requirements and assumptions

The framework currently maintains a snapshot of the internal state, along with the metadata required for recovery. When a program starts, Pathway first looks for persisted checkpoints. It then loads the snapshot data and the information about the next unread offsets in the data sources. Due to this, there is a requirement for the data sources to be persistent, so that if the program is terminated for whatever reason, the next entries can be re-read on restart. Please note that most of the data sources comply with this requirement: that is, S3, Kafka topic, or filesystem entries can easily be re-read if the program restarts.

The guarantee, given by Pathway's persistent recovery, is at-least-once delivery. Internally, the inputs are split into smaller transactional batches. If the program is interrupted during execution, the outputs corresponding to the unclosed transactional batches may duplicate. However, it shouldn't be the case for graceful program termination, where exactly-once semantic can be guaranteed.