Persistence in Pathway: resume the interrupted computation

In this article, you will learn how to resume the interrupted Pathway computation. We will start with a fairly simple Pathway program, which doesn't persist in its state.

It will be run as a separate process in the streaming mode, that is, waiting for the inputs all the time.

In another process, we will run the streamer: a simple script that puts the data into the input directory every so often. So, we will expect that the Pathway program fetches the data from the streamer and uses it in the computation.

We will then terminate the computation and show that when being re-run, the Pathway program without persistence indeed starts and provides the results from the very beginning.

Finally, we demonstrate how to easily make a persistent program from the one we've had before and will see that it catches up where we stopped last time.

Sample task

First of all, we need to pick a task which we will be solving. In this tutorial, we will do the basic word counting.

It will work as follows: the Pathway program will scan the directory in the filesystem, containing the CSV files. The expected format of any CSV file is just one column named word and containing exactly one word. The output format, for example, can be the JsonLines file, containing the stream of changes for the collection of two columns: the word and the count of this word.

Preparing the environment

First things first, we will need some methods which would help us to test both variants: the simple one and the persistent one. The first of these "helpers" would be the cleanup routine. We will store inputs in the directory inputs, and when doing our experiments, we will need to ensure that we start with an empty one.

So, we obtain the following method:

import multiprocessing
import os
import shutil
import subprocess
import time


def clean_input_directory():
    if os.path.exists("inputs/"):
        shutil.rmtree("inputs/")
    os.mkdir("inputs/")


clean_input_directory()

Next, we will need the streamer method which we were talking about before. The goal for the streamer is to emit the new file in the inputs directory every so often. For our purposes, it would be convenient to use the input files containing only one word.

Proceeding with the interface and the implementation, let's also note a few things important for us:

  • We provide a way to specify the emit interval of the new files. It is the variable interval_sec in the method signature;
  • We provide a way to specify the exact amount of input blocks, which will be denoted and the variable how_many in the signature;
  • Finally, we would want to give some stable and reproducible way to generate the words themselves. So, we can emit then in round-robin manner from the list called words.

Eventually, we obtain the following method. Please note that time.time() has microsecond precision on most platforms and thus it will give us convenient and unique file names:

def generate_inputs(interval_sec, how_many, words):
    for file_id in range(how_many):
      time.sleep(interval_sec)
      with open(f"inputs/{time.time()}", "w") as f:
        f.write(f"word\n{words[file_id % len(words)]}")

Solving the problem in Pathway

Now let's create a simple program in Pathway that would do the word count for us. For now, we don't try to be persistent.

First of all, we need to denote the schema for the entries which will be read from the input. This schema is denoted by the class InputSchema in the next section.

Then, let's proceed with the computational logic description. It will consist of three basic steps:

  1. Defining the input connector: since we read from the CSV directory, we can use the pw.io.csv.read method. As a source, let's point to the directory inputs/ (as you can see above the files are streamed there). We also pass the schema and the auto-commit duration of 10 milliseconds just to ensure that we send the updates to the engine often;
  2. Defining the computation: in fact, word counting is the group-by operation on the stream of words. So, we add a group-by operation with the reducer which takes the count of each word and attaches this count to it;
  3. Defining the output connector: the result we've got in the second step should now be sent to the output. Since we write it to the filesystem in jsonlines format, we can use the pw.io.jsonlines.write method.

For convenience, we don't use the top-like view for monitoring the execution, so we disable it by passing the keyword argument to the pw.run method.

Last but not least point, we simulate the envoronment where the Pathway program is run as a separate process - as it usually does. To do so we save the code into a separate file named wordcount.py and then use subprocess.Popen class to run it.

%%writefile wordcount.py

import pathway as pw


class InputSchema(pw.Schema):
    word: str


if __name__ == "__main__":
    words = pw.io.csv.read(
        "inputs/",
        schema=InputSchema,
        autocommit_duration_ms=10
    )
    word_counts = words.groupby(words.word).reduce(words.word, count=pw.reducers.count())
    pw.io.jsonlines.write(word_counts, "result.jsonlines")
    pw.run(monitoring_level=pw.MonitoringLevel.NONE)
Overwriting wordcount.py

The code that runs the computation, in this case, would look as follows:

def run_pathway_wordcount_program():
    pd = subprocess.Popen(["python", "wordcount.py"])
    return pd

Now let's test the program and the streamer together. To do so, we first start the generation of 200 input files with an interval of 50 milliseconds between their writes. These files will alternatively contain strings "hello" and "world".

Having that given, in 10 seconds we will have 200 input files, 100 of which will contain the word "hello" while the other 100 will contain the word "world".

Right after launching the streaming, we launched our Pathway program. We wait for 5 seconds, which is not enough for the streamer to produce all input blocks and then we terminate it with the kill method of subprocess.Popen. And then we wait for another five seconds for the streamer to complete.

# Start streaming inputs
pd_gen = multiprocessing.Process(
    target=generate_inputs,
     args=(0.05, 200, ["hello", "world"])
)
pd_gen.start()

# Run Pathway program
pd_comp = run_pathway_wordcount_program()
time.sleep(5)
pd_comp.kill()
pd_gen.join()

Let's see the results! Reading the whole file can be tedious, and we only need to see its tail.

!tail -5 result.jsonlines
{"word":"hello","count":49,"diff":1,"time":1699279664772}
{"word":"world","count":48,"diff":-1,"time":1699279664822}
{"word":"world","count":49,"diff":1,"time":1699279664822}
{"word":"hello","count":49,"diff":-1,"time":1699279664874}
{"word":"hello","count":50,"diff":1,"time":1699279664874}

As we can see, the results are incomplete. The counts are 50 per word, while the produced amount was 100 for each of these words. No doubt, the reason was that the program was killed during its' normal execution.

But let's run it again. Now we can see that all files are present in the directory. Our goal is to see that now it produces the expected result.

pd_comp = run_pathway_wordcount_program()
time.sleep(5)
pd_comp.kill()

!tail -5 result.jsonlines
{"word":"hello","count":98,"diff":1,"time":1699279678510}
{"word":"world","count":97,"diff":-1,"time":1699279678512}
{"word":"world","count":100,"diff":1,"time":1699279678512}
{"word":"hello","count":98,"diff":-1,"time":1699279678512}
{"word":"hello","count":100,"diff":1,"time":1699279678512}

But now we would also like to see where it started. So we can check the first lines of the output:

!head -5 result.jsonlines
{"word":"world","count":3,"diff":1,"time":1699279678436}
{"word":"hello","count":4,"diff":1,"time":1699279678436}
{"word":"world","count":3,"diff":-1,"time":1699279678438}
{"word":"world","count":4,"diff":1,"time":1699279678438}
{"word":"hello","count":4,"diff":-1,"time":1699279678438}

As we can see, the program started the computations from the beginning. Let's see if we can avoid this.

Introducing Persistence

Persistence is the way for the program to remember where it stopped the computations, the read, and the output during the last execution.

The main idea behind this is that Pathway periodically dumps the state the the given data storage backend. When being restarted, Pathway first looks for the snapshot of the data that was made and if it finds this snapshot, the snapshot gets loaded into the engine and it doesn't have to do the reads and the processing for the data that was preserved there.

Persistence is easy to get started with. Now we will walk you through its setup, after which we will show the program resumes the computation.

Persistence Backend

The persistence mechanism saves the snapshot of the computation. This snapshot consists of some raw data which can be proportional to the input size and some metadata which contains smaller bits of information. These entities must be stored in a durable storage.

Pathway currently gives two choices for this storage:

  • the filesystem, namely the folder on your local drive;
  • the S3 bucket where you can allocate the root directory;

In this demo, we will use the locally-hosted persistent storage, which is the first option among the above.

Since you may want to rerun this demo several times, it's a good idea to have a helper that would clean this locally-hosted storage. We will suppose that it will be hosted in the directory named PStorage:

def clean_persistent_storage():
    if os.path.exists("./PStorage"):
        shutil.rmtree("./PStorage")


clean_persistent_storage()

Now we can proceed with the configuration for the persistence. It will contain two lines.

In the first one, we create the config for the persistent backend. Since the config is filesystem-based, we use the pw.persistence.Backend.filesystem method to create it. It only requires the path where the storage can be hosted.

The second line is where we create the persistence config itself. For our case, we host the snapshot and the metadata in the same place, so we simply use the pw.persistence.Config.simple_config method.

All in all, we have the following configuration code:

backend = pw.persistence.Backend.filesystem("./PStorage")
persistence_config = pw.persistence.Config.simple_config(backend)

Persistent IDs

The second (and optional) thing we need to do is the persistent ID assignment. The persistent IDs are required for the engine to match the data sources between different runs.

In principle, the persistent ID assignment can be done automatically by the engine. In this case, it will assign the persistent IDs to the sources in the order of their appearance and construction. However, this is not generally recommended if you need to change your Pathway program and the data source in the future.

For the sake of completeness, in this tutorial, we will demonstrate the manual persistent ID assignment. The only difference from the non-persistent variant of the input is the parameter persistent_id which should be passed to the pw.io.csv.read method. So, if we name the data source words_data_source the assignment may look as follows:

pw.io.csv.read(
    ...,
    persistent_id="words_data_source"
)

Revisited Pathway Program

Now we can apply the described changes to the program that we had in the beginning. We still need a way to run it in a separate process that can be interrupted, so the interface stays the same. At the same time, we have the following persistent code saved as the wordcount program:

%%writefile wordcount.py

import pathway as pw


class InputSchema(pw.Schema):
    word: str


if __name__ == "__main__":
    words = pw.io.csv.read(
        "inputs/",
        schema=InputSchema,
        autocommit_duration_ms=10,
        persistent_id="words_input_source",  # Changed: now persistent_id is assigned here
    )
    word_counts = words.groupby(words.word).reduce(words.word, count=pw.reducers.count())
    pw.io.jsonlines.write(word_counts, "result.jsonlines")

    backend = pw.persistence.Backend.filesystem("./PStorage")
    persistence_config = pw.persistence.Config.simple_config(backend)
    pw.run(
        monitoring_level=pw.MonitoringLevel.NONE,
        persistence_config=persistence_config,  # Changed: now persistence_config is passed here
    )
Overwriting wordcount.py

Let's test it! Like the last time, let's generate the 200 files with 50 millisecond intervals, each containing either the word "hello" or the word "world" in an alternating manner. Similarly, we will terminate the Pathway program before it manages to read all input files because not all of them will be in place:

# Clean the old files: remove old results and inputs
!rm -rf result.jsonlines
clean_input_directory()

# Start streaming inputs
pd_gen = multiprocessing.Process(
    target=generate_inputs,
     args=(0.05, 200, ["hello", "world"])
)
pd_gen.start()

# Run Pathway program
pd_comp = run_pathway_wordcount_program()
time.sleep(5)
pd_comp.kill()

As a check, we can observe what we have on the output. The streamer wouldn't be able to produce much more than half of the input files by the time the program is interrupted:

!tail -5 result.jsonlines
{"word":"hello","count":49,"diff":1,"time":1699279708352}
{"word":"world","count":48,"diff":-1,"time":1699279708402}
{"word":"world","count":49,"diff":1,"time":1699279708402}
{"word":"hello","count":49,"diff":-1,"time":1699279708452}
{"word":"hello","count":50,"diff":1,"time":1699279708452}

But now let's run the program again! Our goal is to see, with the fill input, where it will start to produce the results. We are running it for 5 seconds because, in the streaming mode, it won't finish by itself.

pd_comp = run_pathway_wordcount_program()
time.sleep(5)
pd_comp.kill()

Let's check the results. This time we need a few first and a few last lines of the output.

The last ones are needed to check if the result is correct: the last count for each word should be equal to 100.

The first lines are needed to see where the program started the computations from.

!head -5 result.jsonlines
!echo "==="
!tail -5 result.jsonlines
{"word":"world","count":49,"diff":-1,"time":1699279716584}
{"word":"world","count":51,"diff":1,"time":1699279716584}
{"word":"hello","count":50,"diff":-1,"time":1699279716584}
{"word":"hello","count":51,"diff":1,"time":1699279716584}
{"word":"world","count":51,"diff":-1,"time":1699279716586}
===
{"word":"hello","count":99,"diff":1,"time":1699279716634}
{"word":"world","count":98,"diff":-1,"time":1699279716636}
{"word":"world","count":100,"diff":1,"time":1699279716636}
{"word":"hello","count":99,"diff":-1,"time":1699279716636}
{"word":"hello","count":100,"diff":1,"time":1699279716636}

As we can see, the results are correct!

Moreover, now the results are produced from the counts around 50 (which may vary from run to run), which means that we did not have to recompute and output the previously computed and outputted data.

Please note that the first lines of the output above may intersect with the last lines of the previous run. Here we can observe the "at-least-once" semantics, with duplicated deliveries happening for the transaction minibatch which wasn't committed by the time the initial computation was interrupted.

Conclusion

Here we demonstrated persistence in action on a very simple example.

While being a powerful mechanism, persistence is also capable of solving different tasks. For instance, in the later tutorial, we will show how it can handle the data source change under certain conditions.

persistencerecovery