Restarting Pathway computation with the new data added

Persistence allows you to pick up computation where it left off, and the presence of a snapshot makes handling data additions between reruns smoother, avoiding the need to restart the entire computation.

This capability is valuable in various scenarios. For instance, real-time logs stored on pods may only become available in hourly increments. In other cases, logs are collected periodically through a cron-like job, such as every ten minutes, every hour, or every day. Typically, analytics need to be rebuilt each time new data appears.

This tutorial will explore how to incorporate such additions using a simple example of log statistics calculation. You will start by computing user sessions on historical data and demonstrate how to integrate new information from real-time logs.

figure.png

Scenario

Suppose that you have the website access log. The log is rather simple and consists of the access time in the format of UNIX timestamp and the user's login.

You would like to analyze the users' behavior on the website. One approach is to create sessions, defined as consecutive, non-extendable time intervals during which the user interacts with the website. Let's assume users spend a maximum of five minutes on a single page before moving to the next page or leaving. Hence, two visits are considered part of the same session if there's no more than a five-minute (or 300-second) gap between them.

The objective is to manage a set of sessions, ensuring each log entry belongs to precisely one session and no two sessions can be merged.

To accommodate adding new data and rerunning the data pipeline, you need to establish a stream of updates on the output.

What to do with updated data

Imagine you have efficiently processed all the logs in the directory using the static mode, and the program has completed successfully. However, since logs are received regularly, you face the challenge of repeating this computation on new log portions that may be generated later or arrive with a delay.

Starting the computation from scratch and receiving the entire stream of updates from the beginning would be costly. The following discussion will teach you how to handle the computation on new, as-yet-unprocessed log files efficiently.

Preparing inputs

First, you need to take care of the input files the engine will parse. You can start by defining the schema. In Pathway format, its definition looks as follows:

import pathway as pw


class InputSchema(pw.Schema):
    user_id: str
    access_time: int

That is, the access time is an integer, denoting the number of seconds passed since the beginning of UNIX epoch. The login, naturally, is a string.

Now, you can start creating the inputs that correspond to this schema. Let's place them in a directory called logs. You first need to create it and clean off the old files, so you need to run a couple of Unix commands:

!rm -rf logs
!mkdir logs

You can now place a few input files in the newly created directory. Let's generate two log files, each with access entries of a single day. Of course, in a real-life scenario, the span and the number of different users can be more significant:

%%writefile logs/part_one.csv
user_id,access_time
alice,1701102730
bob,1701102743
alice,1701103035
bob,1701102774
bob,1701102805
alice,1701103037
Writing logs/part_one.csv

The second file has the same format:

%%writefile logs/part_two.csv
user_id,access_time
alice,1701103040
alice,1701103331
alice,1701103618
Writing logs/part_two.csv

With this setup, you can solve the problem with Pathway and get some initial results.

The script below first reads the data according to the schema and then uses a temporal function to build the unexpendable user-session intervals:

%%writefile script.py
import pathway as pw


class InputSchema(pw.Schema):
    user_id: str
    access_time: int


if __name__ == "__main__":
    table = pw.csv.read("logs/", mode="static", schema=InputSchema)
    sessions = table.windowby(
        pw.this.access_time,
        window=pw.temporal.session(max_gap=300),
        shard=pw.this.user_id,
    ).reduce(
        user_id=table.user_id,
        session_length=(
            pw.reducers.max(table.access_time) - pw.reducers.min(table.access_time)
        ),
        session_start=pw.reducers.min(table.access_time),
        session_finish=pw.reducers.max(table.access_time),
        count=pw.reducers.count(),
    )
    pw.csv.write(sessions, "sessions.csv")
    pw.run(monitoring_level=pw.MonitoringLevel.NONE)
Overwriting script.py

The script can then be run with a simple console command:

!python script.py
sys:1: UserWarning: pathway.csv has been moved to pathway.io.csv
/usr/local/lib/python3.10/dist-packages/pathway/internals/table.py:1189: UserWarning: Key sets of self and other in update_cells are the same. Using with_columns instead of update_cells.
  warnings.warn(
[2023-11-29T15:01:32]:INFO:Preparing Pathway computation
[2023-11-29T15:01:32]:INFO:CsvFilesystemReader-0: 0 entries (1 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:32]:INFO:CsvFilesystemReader-0: 9 entries (2 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:32]:WARNING:CsvFilesystemReader-0: Closing the data source
[2023-11-29T15:01:32]:INFO:FileWriter-0: Done writing 0 entries, time 1701270092168. Current batch writes took: 0 ms. All writes so far took: 0 ms.
[2023-11-29T15:01:32]:INFO:FileWriter-0: Done writing 3 entries, closing data sink. Current batch writes took: 0 ms. All writes so far took: 0 ms.

And the results can be seen with the usage of cat:

!cat sessions.csv
user_id,session_length,session_start,session_finish,count,time,diff
"bob",62,1701102743,1701102805,3,1701270092168,1
"alice",0,1701102730,1701102730,1,1701270092168,1
"alice",583,1701103035,1701103618,5,1701270092168,1

As you can see, the program works correctly.

Specifically, there are two sessions for the user alice: the first involves accessing a single page, and the second spans more than 5 minutes, resulting from multiple website accesses, each with less than a 5-minute interval before the next one.

As for the user bob, there is one session encompassing all of their access events.

Now, let's dive deeper into the problem of adding more data.

Processing newly arrived data in the rerun

Logs may arrive at varying intervals, with a cron-like job delivering data every ten or thirty minutes. In our ideal scenario, you'd like to avoid recalculating results from the beginning and instead focus on processing the new data. But how can you achieve this?

Persistence can help here. The idea is to store the state of previous calculations so that when new files are added later, there's no need to start from scratch.

So you need to do two things. First, you need to "name" input sources by assigning persistent IDs to them. This way, when the program recovers, it can accurately match the operator with the correct data dump.

Here, it is optional because the data processing pipeline doesn't change. It will be shown for the fullness of the example that the only difference is the parameter persistent_id:

access_entries = pw.io.csv.read(
    "logs/",
    schema=InputSchema,
    mode="static",
    persistent_id="logs"
)

The second thing is mandatory: you must set up the persistent storage. The persistent storage is where Pathway stores its dump of data in computations. It can be hosted in S3-like data storage and locally.

Let's store it locally in the folder ./PStorage. To correctly handle the notebook's reruns, let's clean it first:

!rm -rf PStorage

Then, you can create the script for persistent config. This config will consist of two lines.

The first line denotes the backend. Since it's a filesystem-based one, you can use the method pw.persistence.Backend.filesystem to pass the path to the stores.

The second line denotes persistence config. It can be constructed with a method pw.persistence.Config.simple_config, which accepts the backend settings.

persistence_backend = pw.persistence.Backend.filesystem("./PStorage")
persistence_config = pw.persistence.Config.simple_config(persistence_backend)

With the changes described above, the code will look as follows:

%%writefile script.py
import pathway as pw


class InputSchema(pw.Schema):
    user_id: str
    access_time: int


if __name__ == "__main__":
    table = pw.io.csv.read(
        "logs/",
        mode="static",
        schema=InputSchema,
        persistent_id="logs",  # Change: persistent ID assigned
    )
    sessions = table.windowby(
        pw.this.access_time,
        window=pw.temporal.session(max_gap=300),
        shard=pw.this.user_id,
    ).reduce(
        user_id=table.user_id,
        session_length=(
            pw.reducers.max(table.access_time) - pw.reducers.min(table.access_time)
        ),
        session_start=pw.reducers.min(table.access_time),
        session_finish=pw.reducers.max(table.access_time),
        count=pw.reducers.count(),
    )
    pw.io.csv.write(sessions, "sessions.csv")

    # Change: persistence config created
    persistence_backend = pw.persistence.Backend.filesystem("./PStorage")
    persistence_config = pw.persistence.Config.simple_config(persistence_backend)
    pw.run(
        monitoring_level=pw.MonitoringLevel.NONE,

        # Change: persistence config passed to the method
        persistence_config=persistence_config,
    )
Overwriting script.py

Now, you can run it so that it provides the initial artifacts to the persistent storage.

!python script.py
/usr/local/lib/python3.10/dist-packages/pathway/internals/table.py:1189: UserWarning: Key sets of self and other in update_cells are the same. Using with_columns instead of update_cells.
  warnings.warn(
[2023-11-29T15:01:38]:INFO:Preparing Pathway computation
[2023-11-29T15:01:38]:INFO:No time has been advanced in the previous run, therefore no data read from the snapshot
[2023-11-29T15:01:38]:INFO:Reached the end of the snapshot. Exiting the rewind after 0 entries
[2023-11-29T15:01:38]:INFO:Seek the data source to the frontier OffsetAntichain { antichain: {} }
[2023-11-29T15:01:38]:INFO:CsvFilesystemReader-logs: 0 entries (1 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:38]:INFO:CsvFilesystemReader-logs: 9 entries (2 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:38]:WARNING:CsvFilesystemReader-logs: Closing the data source
[2023-11-29T15:01:38]:INFO:FileWriter-0: Done writing 0 entries, time 1701270098482. Current batch writes took: 0 ms. All writes so far took: 0 ms.
[2023-11-29T15:01:38]:INFO:Save offset: (Empty, FilePosition { total_entries_read: 11, path: "/content/logs/part_two.csv", bytes_offset: 71 })
[2023-11-29T15:01:38]:INFO:FileWriter-0: Done writing 3 entries, closing data sink. Current batch writes took: 0 ms. All writes so far took: 0 ms.

Now, let's introduce some new logs. These logs will include access entries for times after the previous data. To demonstrate that the results only capture the new changes, we'll focus solely on the access events for user bob.

%%writefile logs/part_three.csv
user_id,access_time
bob,1701104640
bob,1701104931
bob,1701105218
Writing logs/part_three.csv

Let's run the script with the new entries added:

!python script.py
/usr/local/lib/python3.10/dist-packages/pathway/internals/table.py:1189: UserWarning: Key sets of self and other in update_cells are the same. Using with_columns instead of update_cells.
  warnings.warn(
[2023-11-29T15:01:47]:INFO:Preparing Pathway computation
[2023-11-29T15:01:47]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {250563226431679957366536773997412061550: OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 11, path: "/content/logs/part_two.csv", bytes_offset: 71 }} }} }, storage_types: {250563226431679957366536773997412061550: CsvFilesystem}, last_advanced_timestamp: 1701270098483 }
[2023-11-29T15:01:47]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {250563226431679957366536773997412061550: OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 11, path: "/content/logs/part_two.csv", bytes_offset: 71 }} }} }, storage_types: {250563226431679957366536773997412061550: CsvFilesystem}, last_advanced_timestamp: 1701270098483 }
[2023-11-29T15:01:47]:INFO:Reached the end of the snapshot. Exiting the rewind after 9 entries
[2023-11-29T15:01:47]:INFO:Seek the data source to the frontier OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 11, path: "/content/logs/part_two.csv", bytes_offset: 71 }} }
[2023-11-29T15:01:47]:INFO:CsvFilesystemReader-logs: 0 entries (1 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:47]:INFO:CsvFilesystemReader-logs: 3 entries (2 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:47]:WARNING:CsvFilesystemReader-logs: Closing the data source
[2023-11-29T15:01:48]:INFO:Save offset: (Empty, FilePosition { total_entries_read: 15, path: "/content/logs/part_three.csv", bytes_offset: 65 })
[2023-11-29T15:01:48]:INFO:FileWriter-0: Done writing 1 entries, closing data sink. Current batch writes took: 0 ms. All writes so far took: 0 ms.

And let's check the results.

!cat sessions.csv
user_id,session_length,session_start,session_finish,count,time,diff
"bob",578,1701104640,1701105218,3,1701270107982,1

As you can see, the results were generated only for the modified entries. Specifically, there is no update for the user alice since their session data remains unchanged.

In contrast, there is an update for the user bob; they now have a new session. This fresh session, spanning 578 seconds with three events, is reflected in the output.

To further confirm that only the differences from unprocessed additions are considered each time, let's add one more file. For instance, you can extend the last session of the user alice:

%%writefile logs/part_four.csv
user_id,access_time
alice,1701103338
alice,1701103629
alice,1701103916
Writing logs/part_four.csv

And the run the script again:

!python script.py
/usr/local/lib/python3.10/dist-packages/pathway/internals/table.py:1189: UserWarning: Key sets of self and other in update_cells are the same. Using with_columns instead of update_cells.
  warnings.warn(
[2023-11-29T15:01:52]:INFO:Preparing Pathway computation
[2023-11-29T15:01:52]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {250563226431679957366536773997412061550: OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 11, path: "/content/logs/part_two.csv", bytes_offset: 71 }} }} }, storage_types: {250563226431679957366536773997412061550: CsvFilesystem}, last_advanced_timestamp: 1701270098483 }
[2023-11-29T15:01:52]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {250563226431679957366536773997412061550: OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 15, path: "/content/logs/part_three.csv", bytes_offset: 65 }} }} }, storage_types: {250563226431679957366536773997412061550: CsvFilesystem}, last_advanced_timestamp: 1701270107983 }
[2023-11-29T15:01:52]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {250563226431679957366536773997412061550: OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 11, path: "/content/logs/part_two.csv", bytes_offset: 71 }} }} }, storage_types: {250563226431679957366536773997412061550: CsvFilesystem}, last_advanced_timestamp: 1701270098483 }
[2023-11-29T15:01:52]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {250563226431679957366536773997412061550: OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 15, path: "/content/logs/part_three.csv", bytes_offset: 65 }} }} }, storage_types: {250563226431679957366536773997412061550: CsvFilesystem}, last_advanced_timestamp: 1701270107983 }
[2023-11-29T15:01:52]:INFO:Reached the end of the snapshot. Exiting the rewind after 12 entries
[2023-11-29T15:01:52]:INFO:Seek the data source to the frontier OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 15, path: "/content/logs/part_three.csv", bytes_offset: 65 }} }
[2023-11-29T15:01:52]:INFO:CsvFilesystemReader-logs: 0 entries (1 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:52]:INFO:CsvFilesystemReader-logs: 3 entries (2 minibatch(es)) have been sent to the engine
[2023-11-29T15:01:52]:WARNING:CsvFilesystemReader-logs: Closing the data source
[2023-11-29T15:01:53]:INFO:FileWriter-0: Done writing 0 entries, time 1701270112954. Current batch writes took: 0 ms. All writes so far took: 0 ms.
[2023-11-29T15:01:53]:INFO:Save offset: (Empty, FilePosition { total_entries_read: 19, path: "/content/logs/part_four.csv", bytes_offset: 71 })
[2023-11-29T15:01:53]:INFO:FileWriter-0: Done writing 2 entries, closing data sink. Current batch writes took: 0 ms. All writes so far took: 0 ms.
!cat sessions.csv
user_id,session_length,session_start,session_finish,count,time,diff
"alice",583,1701103035,1701103618,5,1701270112954,-1
"alice",881,1701103035,1701103916,8,1701270112954,1

In this scenario, the situation is more complex. The last session of the user alice was extended. Consequently, the first event signifies its removal since it's no longer accurate due to the new events.

The second event represents the insertion of the new extended session. The updated session lasts 881 seconds instead of 583 and includes eight events instead of five. By applying the removal of the old session and the insertion of the new one, you arrive at a valid updated snapshot.

In summary, you've learned how persistence can optimize your computations. Whether running in static mode or streaming, it is a valuable tool for expediting analytics and conserving resources.

persistenceoptimizationrestart