Your first realtime app with Pathway

Start doing realtime processing with this small example on how to sum in real time using Pathway CSV directory connectors.

Pathway is a framework for realtime data processing, being able to connect to streaming data sources is primordial. For the sake of engineering simplicity, we will create a small streaming application that application which will track CSV files added to a folder and will maintain as its output as a special CSV file with a compact representation of result changes over time. To see how to run Pathway with distributed streaming message brokers, such as Kafka, see ours tutorials. The streaming data is in the form of an incoming stream of new CSV files added into a common directory, and we want the output to be updated whenever a new CSV file is added to the directory. We will do it, step by step, from the installation of Pathway, to the debugging of the app.

Ready? You'll see, Pathway makes things really easy for us.

Pathway's solution

Using Pathway, the described problem can be easily solved by the following code:

import pathway as pwt = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=True)t = t.select(value=pw.apply_with_type(int, int, t.value))t = t.reduce(sum=pw.reducers.sum(t.value))pw.csv.write(t, "output_stream.csv")pw.run()

Quite compact right? Pathway provides everything to make realtime data processing easy.

However, let's see in practice all the required steps to make the app work:

Installation of Pathway

You can download the current Pathway release, which is now available in Open Beta on a free-to-use license:

                    
                        pip install --extra-index-url 
                        
                         pathway
                    
                

on a Python 3.8+ installation, and we are ready to roll!

Doing a realtime sum using Pathway

The first thing to use pathway is to import it:

import pathway as pw

Now, we can use Pathway to do realtime streaming data processing!

To solve our problem, we need two things:

  1. CSV connectors to read the input stream and to output our results.
  2. A processing pipeline to compute the sum of the incoming values.

CSV connectors

First, we need two CSV connectors: one for reading the input stream and one for the output.

For the input, we consider a stream in the form of CSV files: each new update is contained in its own CSV file. We need to specify the directory in which we are going to listen to updates, the columns there are, and we also need to specify that we are in streaming mode. It can be one easily in one line, assuming all the values are stored in a column value:

t = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=True)

Every time a new CSV file is added into ./sum_input_data/, its content will be automatically added to the table t.

For the output connector, it's even easier: we just need to specify the table we want to output and the address of the CSV file in which the output will be written:

pw.csv.write(t, "sum_output_stream.csv")

Every time the table t is updated, the changes will be automatically appended in output_stream.csv.

Doing a sum

If you need a small reminder on how manipulate the data with Pathway, don't hesitate to read our survival guide. To do our sum, we need first need to convert our data into integer (by default the values are string) and then do a groupby and then sum the aggregated values:

t = t.select(value=pw.apply_with_type(int, int, t.value))t = t.reduce(sum=pw.reducers.sum(t.value))

That's it!

Creating the input stream

To try live our example, we need to periodically insert new CSV files into a given directory. It can be easily done using a bash script which prints every second a new value:

for LOOP_ID in {1..30}do    filename="./sum_input_data/input_table_$LOOP_ID.csv"    printf "value\n$LOOP_ID\n" >> $filename    sleep 1done

With the CSV connector, the addition of a single CSV file is seen as the reception of a *COMMIT* message (you can see more about how Pathway's connectors work here).

Gathering everything into one piece

The final version of our project contains two files: realtime_sum.py which processes the stream using Pathway and generating_stream.sh which generates the streams.

Here is realtime_sum.py:

import pathway as pwt = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=True)t = t.select(value=pw.apply_with_type(int, int, t.value))t = t.reduce(sum=pw.reducers.sum(t.value))pw.csv.write(t, "output_stream.csv")pw.run()

Don't forget to run the computation with pw.run(), otherwise nothing will happen. The result is quite compact, considering that the sum is updated in realtime whenever an update is received!

And the generating_stream.sh:

#!/bin/bashsrc="./sum_input_data/"rm -r $srcmkdir -p $srcsleep 10for LOOP_ID in {1..30}do    filename=$src"input_table_$LOOP_ID.csv"    printf "value\n$LOOP_ID\n" >> $filename    sleep 1done

The input connector requires the input CSV directory to exist so the script generating the stream should be launch first. The input connector will connect to the file and update the results every time a new CSV file is added, updating automatically and in real time the output CSV file using the output CSV connector.

Understanding the output

The output of this project is a CSV file output_stream.csv in which all the successive updates of the sum values are displayed:

sum,time,diff1,1,11,2,-13,2,13,3,-16,3,1...

Let's explain what is the meaning of those lines. As explained in the article about our connectors, each update is represented by up to two rows. Each row contains:

  • the columns of the output table, here we only have sum.
  • time, which represents the logical clock at which the update has happened. In practice, each commit increments the time by one.
  • diff, which represents whether the row represents an addition or a deletion. An update is represented by two rows: one to remove the old value, one to add the new values. Those two rows have the same logical clock to ensure the atomicity of the operation.

Hence, the first line represents the insertion (diff=1) of a new value (sum=1), which has happened at time 1. Then a new CSV file is created containing the new value 2, changing the value from 1 to 3, resulting in both the deletion (diff=-1) of the previous value (sum=1) which has happened at time 2 AND an addition (diff=1) of a new value (sum=3) at the same time (time=2).

The output file is updated until the process is killed.

Note: the updates may not be visible as soon as they are done depending on the way your OS is handling files. In Linux, the changes of this example may be cached into a buffer which may not be flushed before the termination of the computation.

Debugging with static data

If you have tried to come up with the Pathway implementation by yourself you may have realized that working with streaming data may not be the most convenient way to see what's wrong with your code. Fortunately, Pathway provides a static mode to work in a "batch" mode in which static and finite data is manipulated. One of the main advantage is to be able to trigger a computation, using the whole data, and to print the results without using an output connector thanks to pw.debug.compute_and_print! If you want to learn more about both modes, read our article about them.

Let's try to see what the static mode can do to help us!

Small reminder: using the static modes only changes how the data is accessed, i.e. the input/output connectors.

First, we need static data. In our case, the static CSV input connector works the same as its streaming counterpart: we can use the same input as for the live example, but we need to have the generation done before executing our Pathway sample. To simplify we can use the following script:

#!/bin/bashsrc="./sum_input_data/"rm -r $srcmkdir -p $srcfor LOOP_ID in {1..10}do    filename=$src"input_table_$LOOP_ID.csv"    printf "value\n$LOOP_ID\n" >> $filenamedone

It's the same script as before but without the sleep calls and producing a smaller number of values.

Now, we can try if our data is correctly read by using the CSV input connector and pw.debug.compute_and_print to print the data!

import pathway as pw# The flag poll_new_object is set to false to use the static versiont = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=False)pw.debug.compute_and_print(t)

This should print the table! Now you can check at every step if everything works as expected:

import pathway as pwt = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=False)pw.debug.compute_and_print(t)t = t.select(value=pw.apply_with_type(int, int, t.value))pw.debug.compute_and_print(t)t = t.reduce(sum=pw.reducers.sum(t.value))pw.debug.compute_and_print(t)

There is no need for output connector nor pw.run() in the static mode. We can also use a static output connector to print the data into a CSV file:

import pathway as pwt = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=False)pw.debug.compute_and_print(t)t = t.select(value=pw.apply_with_type(int, int, t.value))pw.debug.compute_and_print(t)t = t.reduce(sum=pw.reducers.sum(t.value))pw.csv.write(t, "output_stream.csv")

The final value of t is written in the output file.

⚠️ Note that the computation is triggered for each call to pw.debug.compute_and_print. It means that the previous code sample could be seen as the successive and independent executions of the three following code samples:

import pathway as pwt = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=False)pw.debug.compute_and_print(t)
t = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=False)t = t.select(value=pw.apply_with_type(int, int, t.value))pw.debug.compute_and_print(t)
t = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=False)t = t.select(value=pw.apply_with_type(int, int, t.value))t = t.reduce(sum=pw.reducers.sum(t.value))pw.csv.write(t, "output_stream.csv")

When the code get complex the computation can become very long: keep the data sample small and limit as much as possible the calls to pw.debug.compute_and_print.

Tests

You may want to build tests to be sure your application is correct. Pathway is a Python framework and thus compatible with standard python frameworks. You can use your favorite python testing framework such as pytest!

Deploy your code in production

Now that your application has been debugged and tested, you may want to test it for real. In practice, since Pathway is a Python framework, you can follow the same processes you are used to.

Single machine using multiprocessing

While being a requirement for efficiency, doing multiprocessing and multithreading with Python can be exhausting. Pathway natively proposes multiprocessing and multiprocessing: no extra library nor GIL! You only need to use the command spawn and specify the number of processes and the number of threads by process you want using the --processes (-n) and the --threads (-t) options! To launch your application with 2 processes, having 3 threads each, you can do as follows:

pathway spawn --processes 2 --threads 3 python realtime_sum.py

Distributed processing

It is also possible to use Pathway using distributed processing on more than one machine. However, this feature is only available using Pathway for Enterprise. To see how it works, you can take a look at our dedicated example (coming soon).

To go further

Congrats, you have now all the tools to make realtime data processing using Pathway!

Doing a sum was a bit too simple right? Why don't you try something more challenging and test our example on how to build a linear regression on data sent to Kafka?!

If you want to learn more about Pathway, you can read our tutorials about the key concepts behind Pathway, the differences between the streaming mode and the static mode, or the survival guide of Pathway.

Olivier Ruas

Algorithm and Data Processing Magician