Streaming and static modes

While Pathway is made for processing unbounded streaming data, static data can be use for testing and debugging purposes. This article debunks those two modes -streaming and static- and explains their differences.

Pathway is a streaming framework and thus is purposely designed to work with streaming data. However, working in the streaming mode may not be the most convenient way to test and debug your application. To ease testing and debugging, Pathway provides a static mode which allows the user to manipulate static and finite data. In the following, we are going to see what is behind those terms, their differences, and how to use both modes.

Computation graph

We start with a quick reminder about Pathway and its computation graph. In Pathway, the processing pipeline defined by the user is modeled using a graph. This graph, called the computation graph, models the different transformation steps performed on the data. Each node is either a table, an operation performed on one or more tables (e.g. a transformer), or a connector.

For example, we can consider the following scenario: we have two tables containing the ages (in T1) and the countries (in T2) of different persons and we want to compute the list of people from a given country (let's say Poland) and the list of people of that country and whether they are adults or not. In Pathway, we can build a function which works on two tables T1 and T2 as follows:

T1bis = T1.select(*pw.this, adult=pw.apply(lambda x: x>18, pw.this.age))T2bis = T2.filter(pw.this.country == "US")T3 = T1.join(T2bis, pw.left.name == pw.right.name).select(    pw.left.name, pw.left.adult)

In practice, we would need two input connectors to create T1 and T2, use the previous sample of code to build T2bis and T3, and then use output connectors to ouput the resulting tables outside of Pathway. The computation graph of such pipeline would be looking like this:

Universe

The corresponding notations would be the following: T1' T1bis, T2' T2bis, t1 select with apply, t2 filter, and t3 join and select.

The main take-away is that Pathway builds this graph based on the pipeline defined by the user, before any computation is actually done. This graph models the relations between the different connectors, tables, and transformers. Its main purpose is to provide fast updates in streaming mode.

Streaming mode

In the streaming mode, Pathway assumes unbounded incoming updates. This mode requires input connectors listening to streaming data sources. Pathway starts to ingest the data into the computation graph only when pw.run() is called and the data is maintained in the graph during the computation: results cannot be printed but should be accessed using output connectors. The computation runs until the process is killed: everything after pw.run() is unreachable code.

In our example, T1 and T2 could be obtained using Kafka connectors, and T2bis and T3 could be outputed using PostgreSQL connectors.

Pathway uses the graph to provide fast updates: instead of ingesting the data from scratch in the graph everytime an update is received, the data is maintained in the graph and locally updated. For instance, the reception of an update from the Input Data 1, would only modify T1, T1', and T3 without any impact on T2 and T2'. The updates are processed as they are received, without notion of batch, providing realtime streaming processing.

Pathway is designed to be run in this streaming mode and it is the standard way to run Pathway.

Static mode

The streaming mode may not be the most convenient when testing or debugging. For that purpose, Pathway provides a static mode in which static data may be attached to the connectors. In that mode, finite and static data can be loaded, e.g. from a table written in a static csv file or from a markdown table.

In our example, Input Data 1 and Input Data 2 can be small static table written by hand in a markdown file. Pathway provides static output connectors to load this static data:

T1 = pw.debug.table_from_markdown(    """    | name  | age 1  | Alice | 15 2  | Bob   | 32 3  | Carole| 28 4  | David | 35 """)T2 = pw.debug.table_from_markdown(    """    | name  | country 1  | Alice | US 2  | Bob   | France 3  | Carole| Germany 4  | David | US """)

When the computation is run in the static mode, all the data is loaded and processed at once. While the static mode does not fully benefit from the computation graph, it allows to check if the graph is correctly built.

To ease the debugging, Pathway provides a function called compute_and_print. When calling pw.debug.compute_and_print(t), Pathway builds the whole graph, ingests all the available static data, prints the obtained table t, and then discards the data. Calling twice compute_and_print will result in ingesting the data twice.

Summaries of differences

Here is a summary of the main differences in two modes:

Streaming modeStatic mode
Data typeunbounded, streaming datafinite, static data
Computation typestreamingbatch
Can be used in productionyesno
Starting the computationpw.runpw.debug.compute_and_print
Builds a computation graphyesyes
How the data is ingested in the graphmaintained at each updateingested from scratch everytime
Termination of the computationruns foreverautomatically terminated once the graph is build
Printing datano, data should be accessed using output connectorsyes, via pw.debug.compute_and_print

Comparative example

In the following, we implement our full example into Pathway using both modes.

Common pipeline

The processing pipeline should be designed in the same way no matter mode is used in the end. The only difference should how the input and output data is manipulated. To highlight the fact that rest of the implementation remains the same, we going to implement our pipeline in a function taking T1 and T2 as parameters and returning T2bis and T3:

import pathway as pwdef pipeline(T1, T2):    T1bis = T1.select(*pw.this, adult=pw.apply(lambda x: x > 18, pw.this.age))    T2bis = T2.filter(pw.this.country == "US")    T3 = T1bis.join(T2bis, pw.left.name == pw.right.name).select(        pw.left.name, pw.left.adult    )    return (T2bis, T3)

Streaming mode

In the streaming mode, we need to connect to external data sources such as Kafka for the input or PostgreSQL for the output. Our implementation would be like this:

T1=pw.io.kafka.read(settingsKafka1, ["name","age"])T2=pw.io.kafka.read(settingsKafka2, ["name","country"])T2bis,T3=pipeline(T1,T2)pw.io.postgres.write(settingsPostgres1, T1)pw.io.postgres.write(settingsPostgres2, T2)pw.run()

The computation is started with pw.run() and will not finish until the process is killed. The results of the computations are send to PostgreSQL via the output connectors.

Static mode

In the static mode, we can enter by hand the input and to check step by step what is happening on this data. In the static mode, our example becomes:

T1 = pw.debug.table_from_markdown(    """    | name  | age 1  | Alice | 15 2  | Bob   | 32 3  | Carole| 28 4  | David | 35 """)T2 = pw.debug.table_from_markdown(    """    | name  | country 1  | Alice | US 2  | Bob   | France 3  | Carole| Germany 4  | David | US """)T2bis, T3 = pipeline(T1, T2)

In the static mode, there is no output connector. This implementation does not do anything: it builds the graph but does not add any data to it. We need to trigger the insertion of the data into the graph by printing some data. We can check that our tables have been well defined and loaded by printing them:

pw.debug.compute_and_print(T1)
            | name   | age
^2TMTFGY... | Alice  | 15
^YHZBTNY... | Bob    | 32
^SERVYWW... | Carole | 28
^8GR6BSX... | David  | 35
pw.debug.compute_and_print(T2)
            | name   | country
^2TMTFGY... | Alice  | US
^YHZBTNY... | Bob    | France
^SERVYWW... | Carole | Germany
^8GR6BSX... | David  | US

We can also check that our pipeline returns the expected tables:

pw.debug.compute_and_print(T2bis)
            | name  | country
^2TMTFGY... | Alice | US
^8GR6BSX... | David | US
pw.debug.compute_and_print(T3)
            | name  | adult
^VJ3K9DF... | Alice | False
^XG83A8H... | David | True

Conclusion

While Pathway is made for the streaming mode, the static mode can be used to test and debug your pipeline. The implementation should be the same in both mode, only the way data is input and output differs.

The different ways to access the data both in streaming and static mode is explained in more details in our article about connectors.

Olivier Ruas

Algorithm and Data Processing Magician