While Pathway is made for processing bounded and unbounded streaming data, entirely static data can also be used for testing and debugging purposes. This article explains what are those two modes -streaming and static- and their differences.
Pathway is purposely designed to work with streaming data. However, working in the streaming mode may not be the most convenient way to test and debug your application. To ease testing and debugging, Pathway provides a static mode which allows you to manipulate static and finite data. In the following, you are going to see what lies behind those terms, their differences, and how to use both modes.
Let's start with a brief explanation of Pathway's computation graph. In Pathway, the processing pipeline (dataflow) that you define is modeled using a graph. This graph, called the computation graph, models the different transformation steps performed on the data. Each node is either an operation performed on one or more tables (e.g. a transformer), or a connector.
For example, consider the following scenario: you have two tables containing the ages (in T1) and the countries (in T2) of different people and you want to compute the list of people from a given country (let's say the US) and the list of people in this country and whether they are adults or not. In Pathway, you can build a function which works on two tables T1 and T2 as follows:
T1bis = T1.select(*pw.this, adult=pw.apply(lambda x: x>18, pw.this.age)) T2bis = T2.filter(pw.this.country == "US") T3 = T1bis.join(T2bis, pw.left.name == pw.right.name).select( pw.left.name, pw.left.adult )
In practice, you would need two input connectors to create T1 and T2, you can use the previous sample of code to build T2bis and T3, and then use output connectors to output the resulting tables outside of Pathway. The computation graph of such a pipeline would be looking like this:
The corresponding notations would be: T1' T1bis, T2' T2bis, t1
filter, and t3
The main take-away is that Pathway builds this graph based on the pipeline defined by the user, before any computation is actually done. This graph models the relations between the different connectors, tables, and transformers. Its main purpose is to provide fast updates in streaming mode.
In the streaming mode, Pathway assumes unbounded input data updates.
This mode requires input connectors listening to streaming data sources.
Pathway starts to ingest the data into the computation graph only when
pw.run() is called and the data is maintained in the graph during the computation: results cannot be printed but should be accessed using output connectors.
The computation runs until the process is killed: everything after
pw.run() is unreachable code.
In this example, T1 and T2 could be obtained using Kafka connectors, and T2bis and T3 could be output using PostgreSQL connectors.
Pathway uses the graph to provide fast updates: instead of ingesting the data from scratch in the graph everytime an update is received, the data is maintained in the graph and locally updated. For instance, the reception of an update from the Input Data 1, would only modify T1, T1', and T3 without any impact on T2 and T2'. The updates are processed as they are received, without notion of batch, providing realtime streaming processing.
Pathway is designed to be run in this streaming mode and it is the standard way to run Pathway.
As briefly mentioned, the streaming mode may not be the most convenient when testing or debugging. For that purpose, Pathway provides a static mode in which static data may be attached to the connectors. In that mode, finite and static data can be loaded, e.g. from a table written in a static csv file or from a markdown table.
In our example, Input Data 1 and Input Data 2 can be small static tables written by hand in a markdown file. Pathway provides static output connectors to load this static data:
T1 = pw.debug.table_from_markdown( """ | name | age 1 | Alice | 15 2 | Bob | 32 3 | Carole| 28 4 | David | 35 """) T2 = pw.debug.table_from_markdown( """ | name | country 1 | Alice | US 2 | Bob | France 3 | Carole| Germany 4 | David | US """)
When the computation is run in the static mode, all the data is loaded and processed at once. While the static mode does not fully benefit from the computation graph, it allows checking if the graph is correctly built.
To ease the debugging, Pathway provides a function called
pw.debug.compute_and_print(t), Pathway builds the whole graph, ingests all the available static data, prints the obtained table
t, and then discards the data.
compute_and_print will result in ingesting the data twice.
Here is a summary of the main differences in the two modes:
|Streaming mode||Static mode|
|Data type||unbounded, streaming data||finite, static data|
|Can be used in production||yes||no|
|Starting the computation|
|Builds a computation graph||yes||yes|
|How the data is ingested in the graph||maintained at each update||ingested from scratch everytime|
|Termination of the computation||runs forever||automatically terminated once the graph is built|
|Printing data||no, data should be accessed using output connectors||yes, via |
In the following, we implement our full example into Pathway using both modes.
The processing pipeline should be designed in the same way no matter what mode is used in the end. The only difference should be with regartds to how the input and output data is manipulated. To highlight the fact that rest of the implementation remains the same, you can implement the pipeline in a function taking T1 and T2 as parameters and returning T2bis and T3:
import pathway as pw class SchemaT1(pw.Schema): name: str age: int class SchemaT2(pw.Schema): name: str country: str def pipeline(T1, T2): T1bis = T1.select(*pw.this, adult=pw.apply(lambda x: x > 18, pw.this.age)) T2bis = T2.filter(pw.this.country == "US") T3 = T1bis.join(T2bis, pw.left.name == pw.right.name).select( pw.left.name, pw.left.adult ) return (T2bis, T3)
In the streaming mode, we must connect to external data sources such as CSV files for the input and for the output. Our implementation would be like this:
T1=pw.io.csv.read(inputDir1, schema=SchemaT1, mode="streaming") T2=pw.io.csv.read(inputDir2, schema=SchemaT2, mode="streaming") T2bis,T3=pipeline(T1,T2) pw.io.csv.write(T2bis, outputDir1) pw.io.csv.write(T3, outputDir2) pw.run()
The computation is started with
pw.run() and will not finish until the process is killed.
The results of the computations are sent to PostgreSQL via the output connectors.
In the static mode, if the connectors are compatible, we only need to change the mode from "streaming" to "static":
T1=pw.io.csv.read(inputDir1, schema=SchemaT1, mode="static") T2=pw.io.csv.read(inputDir2, schema=SchemaT2, mode="static") T2bis,T3=pipeline(T1,T2) pw.io.csv.write(T2bis, outputDir1) pw.io.csv.write(T3, outputDir2) pw.run()
And that's it! The computation will be launched, all the available data will be ingested and processed, and the output will be written in the same CSV files. The only difference is that the files will not be updated whenever new CSV files are added to the input directories: only the data available at launch time will be processed.
But the static mode has more to offer: we can enter the input data by hand and check step by step what is happening on this data. With manually entered data, our example becomes:
T1 = pw.debug.table_from_markdown( """ | name | age 1 | Alice | 15 2 | Bob | 32 3 | Carole| 28 4 | David | 35 """ ) T2 = pw.debug.table_from_markdown( """ | name | country 1 | Alice | US 2 | Bob | France 3 | Carole| Germany 4 | David | US """ ) T2bis, T3 = pipeline(T1, T2)
Without output connectors, this implementation does not do any computation: it builds the graph but does not add any data. We need to trigger the data insertion into the graph by printing some data. In the static mode, we can check that our tables have been well defined and loaded by printing them:
[2023-11-28T19:31:36]:INFO:Preparing Pathway computation | name | age ^YYY4HAB... | Alice | 15 ^Z3QWT29... | Bob | 32 ^3CZ78B4... | Carole | 28 ^3HN31E1... | David | 35
The extra column contains the indexes of the rows (the
... at the end means that they are truncated for display – the full IDs are 128 bit and are a bit long in a text format 😉).
Every table has such a column and
id is a reserved name which cannot be used as column name.
Indexes are pointers and can be generated based on a given input column, otherwise they are generated automatically.
[2023-11-28T19:31:36]:INFO:Preparing Pathway computation | name | country ^YYY4HAB... | Alice | US ^Z3QWT29... | Bob | France ^3CZ78B4... | Carole | Germany ^3HN31E1... | David | US
You can also check that the pipeline returns the expected tables:
[2023-11-28T19:31:36]:INFO:Preparing Pathway computation | name | country ^YYY4HAB... | Alice | US ^3HN31E1... | David | US
[2023-11-28T19:31:36]:INFO:Preparing Pathway computation | name | adult ^04VMVCM... | Alice | False ^86W6AGV... | David | True
While Pathway is made for the streaming mode, the static mode can be used to test and debug your pipeline. The implementation should be the same in both modes. Only the way data is input and output differs.
The different ways to access the data both in streaming and static mode is explained in more details in our guide to connectors.