Streaming and static modes
While Pathway is made for processing unbounded streaming data, static data can be used for testing and debugging purposes. This article explains what are those two modes -streaming and static- and their differences.
Pathway is a streaming framework and thus is purposely designed to work with streaming data. However, working in the streaming mode may not be the most convenient way to test and debug your application. To ease testing and debugging, Pathway provides a static mode which allows you to manipulate static and finite data. In the following, you are going to see what lies behind those terms, their differences, and how to use both modes.
Let's start with a brief explanation of Pathway' computation graph. In Pathway, the processing pipeline that you define is modeled using a graph. This graph, called the computation graph, models the different transformation steps performed on the data. Each node is either a table, an operation performed on one or more tables (e.g. a transformer), or a connector.
For example, consider the following scenario: you have two tables containing the ages (in T1) and the countries (in T2) of different people and you want to compute the list of people from a given country (let's say the US) and the list of people in this country and whether they are adults or not. In Pathway, you can build a function which works on two tables T1 and T2 as follows:
T1bis = T1.select(*pw.this, adult=pw.apply(lambda x: x>18, pw.this.age))T2bis = T2.filter(pw.this.country == "US")T3 = T1.join(T2bis, pw.left.name == pw.right.name).select(pw.left.name, pw.left.adult)
In practice, you would need two input connectors to create T1 and T2, you can use the previous sample of code to build T2bis and T3, and then use output connectors to output the resulting tables outside of Pathway. The computation graph of such a pipeline would be looking like this:
The corresponding notations would be: T1' T1bis, T2' T2bis, t1
filter, and t3
The main take-away is that Pathway builds this graph based on the pipeline defined by the user, before any computation is actually done. This graph models the relations between the different connectors, tables, and transformers. Its main purpose is to provide fast updates in streaming mode.
In the streaming mode, Pathway assumes unbounded incoming updates.
This mode requires input connectors listening to streaming data sources.
Pathway starts to ingest the data into the computation graph only when
pw.run() is called and the data is maintained in the graph during the computation: results cannot be printed but should be accessed using output connectors.
The computation runs until the process is killed: everything after
pw.run() is unreachable code.
In this example, T1 and T2 could be obtained using Kafka connectors, and T2bis and T3 could be output using PostgreSQL connectors.
Pathway uses the graph to provide fast updates: instead of ingesting the data from scratch in the graph everytime an update is received, the data is maintained in the graph and locally updated. For instance, the reception of an update from the Input Data 1, would only modify T1, T1', and T3 without any impact on T2 and T2'. The updates are processed as they are received, without notion of batch, providing realtime streaming processing.
Pathway is designed to be run in this streaming mode and it is the standard way to run Pathway.
As briefly mentioned, the streaming mode may not be the most convenient when testing or debugging. For that purpose, Pathway provides a static mode in which static data may be attached to the connectors. In that mode, finite and static data can be loaded, e.g. from a table written in a static csv file or from a markdown table.
In our example, Input Data 1 and Input Data 2 can be small static tables written by hand in a markdown file. Pathway provides static output connectors to load this static data:
T1 = pw.debug.table_from_markdown("""| name | age1 | Alice | 152 | Bob | 323 | Carole| 284 | David | 35""")T2 = pw.debug.table_from_markdown("""| name | country1 | Alice | US2 | Bob | France3 | Carole| Germany4 | David | US""")
When the computation is run in the static mode, all the data is loaded and processed at once. While the static mode does not fully benefit from the computation graph, it allows checking if the graph is correctly built.
To ease the debugging, Pathway provides a function called
pw.debug.compute_and_print(t), Pathway builds the whole graph, ingests all the available static data, prints the obtained table
t, and then discards the data.
compute_and_print will result in ingesting the data twice.
Summaries of differences
Here is a summary of the main differences in the two modes:
|Streaming mode||Static mode|
|Data type||unbounded, streaming data||finite, static data|
|Can be used in production||yes||no|
|Starting the computation|
|Builds a computation graph||yes||yes|
|How the data is ingested in the graph||maintained at each update||ingested from scratch everytime|
|Termination of the computation||runs forever||automatically terminated once the graph is built|
|Printing data||no, data should be accessed using output connectors||yes, via |
In the following, we implement our full example into Pathway using both modes.
The processing pipeline should be designed in the same way no matter what mode is used in the end. The only difference should be with regartds to how the input and output data is manipulated. To highlight the fact that rest of the implementation remains the same, you can implement the pipeline in a function taking T1 and T2 as parameters and returning T2bis and T3:
import pathway as pwdef pipeline(T1, T2):T1bis = T1.select(*pw.this, adult=pw.apply(lambda x: x > 18, pw.this.age))T2bis = T2.filter(pw.this.country == "US")T3 = T1bis.join(T2bis, pw.left.name == pw.right.name).select(pw.left.name, pw.left.adult)return (T2bis, T3)
In the streaming mode, we need to connect to external data sources such as Kafka for the input or PostgreSQL for the output. Our implementation would be like this:
T1=pw.io.kafka.read(settingsKafka1, ["name","age"])T2=pw.io.kafka.read(settingsKafka2, ["name","country"])T2bis,T3=pipeline(T1,T2)pw.io.postgres.write(settingsPostgres1, T1)pw.io.postgres.write(settingsPostgres2, T2)pw.run()
The computation is started with
pw.run() and will not finish until the process is killed.
The results of the computations are sent to PostgreSQL via the output connectors.
In the static mode, we can enter by hand the input and check step by step what is happening on this data. In the static mode, our example becomes:
T1 = pw.debug.table_from_markdown("""| name | age1 | Alice | 152 | Bob | 323 | Carole| 284 | David | 35""")T2 = pw.debug.table_from_markdown("""| name | country1 | Alice | US2 | Bob | France3 | Carole| Germany4 | David | US""")T2bis, T3 = pipeline(T1, T2)
In the static mode, there is no output connector. This implementation does not do anything: it builds the graph but does not add any data to it. We need to trigger the insertion of the data into the graph by printing some data. We can check that our tables have been well defined and loaded by printing them:
| name | age ^YYY4HAB... | Alice | 15 ^Z3QWT29... | Bob | 32 ^3CZ78B4... | Carole | 28 ^3HN31E1... | David | 35
The extra column contains the indexes of the rows (the
... at the end means that they are truncated for display – the full IDs are 128 bit and are a bit long in a text format 😉).
Every table has such a column and
id is a reserved name which cannot be used as column name.
Indexes are pointers and can be generated based on a given input column, otherwise they are generated automatically.
| name | country ^YYY4HAB... | Alice | US ^Z3QWT29... | Bob | France ^3CZ78B4... | Carole | Germany ^3HN31E1... | David | US
You can also check that the pipeline returns the expected tables:
| name | country ^YYY4HAB... | Alice | US ^3HN31E1... | David | US
| name | adult ^04VMVCM... | Alice | False ^86W6AGV... | David | True
While Pathway is made for the streaming mode, the static mode can be used to test and debug your pipeline. The implementation should be the same in both modes. Only the way data is input and output differs.
The different ways to access the data both in streaming and static mode is explained in more details in our article about connectors.