Your first realtime app with Pathway

Build your first data streaming app that computes a sum in realtime using Pathway.

Pathway is a framework for realtime data processing, and is able to connect to different data sources. For the sake of engineering simplicity, you will create a small streaming application that tracks CSV files added to a folder and computes a sum over the values in those files. The sum will be output in a CSV file. More precisely, every time a new value is added, the changes to the sum will be appended to the file.

To see how to run Pathway with distributed streaming message brokers, such as Kafka, see our tutorials.

Below you will learn the process of creating such an application, step by step, starting at the installation of Pathway, going through the development and debugging process, and ending with the deployment in the production environment.

Ready? You'll see, Pathway makes things really easy for you.

To kickoff the realtime sum example even quicker, check out our configurable project template and start developing instantly.

The solution with Pathway

Using Pathway, the described problem can be easily solved with the following code:

import pathway as pw

class InputSchema(pw.Schema):
    value: int


t = pw.io.csv.read(
    './sum_input_data/',
    schema=InputSchema,
    mode="streaming"
)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.csv.write(t, "output_stream.csv")
pw.run()

Quite compact right? Pathway provides everything to make your realtime data processing easy.

Let's see in practice the steps to make the app work for you:

Installation of Pathway

You can download the current Pathway release, which is now available on a free-to-use license:

            pip install -U pathway
        

on a Python 3.10+ installation, and you are ready to roll!

⚠️ Pathway is available on MacOS and Linux. Pathway is currently not supported on Windows.

Realtime sum using Pathway

The first thing in order to use Pathway is to import it:

import pathway as pw

Now, you can use Pathway to do realtime streaming data processing!

To solve the problem, you need two things:

  1. Input and output connectors to read the input stream and to output our results.
  2. A processing pipeline to compute the sum of incoming values.

Connect Pathway to your Stream

First, you need two connectors: one to read the input stream and one for the output.

For the input, consider a stream in the form of CSV files: each new update is contained in its own CSV file. You need to specify the directory in which you are going to listen for updates, the schema of the table, and also need to specify that you are in streaming mode. Accessing the stream is done easily, assuming all the values are stored in a column value:

class InputSchema(pw.Schema):
    value: int

t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="streaming"
)

Every time a new CSV file is added into ./sum_input_data/, its content will be automatically added to the table t. With the schema parameter, the input values are cast to the corresponding types.

💡 If you don't want to generate CSV files, you can directly use our stream generator to obtain this dataset:

t = pw.demo.range_stream()

For the output connector, it's even easier: you just need to specify the table you want to output and the address of the CSV file in which the output will be written:

pw.io.csv.write(t, "sum_output_stream.csv")

Every time the table t is updated, the changes will be automatically appended in output_stream.csv.

Process the Data

To compute the sum, which later is maintained, you can sum the values using a reduce and the sum reducer pw.reducers.sum:

t = t.reduce(sum=pw.reducers.sum(t.value))

That's it!

If you need more information on how manipulate the data with Pathway, check out our First-steps guide.

SQL version

If you do not feel confident enough to manipulate your tables with Pathway' Python API just yet, you can use SQL queries directly in Pathway:

sql_query = "SELECT SUM(value) AS sum_values FROM t"
t = pw.sql(sql_query, t=t)

You can learn more from our dedicated article about the SQL API.

Generate a Live Data Stream

You can skip this section if you use our stream generator pw.demo.range_stream()

To try live our example, you need to periodically insert new CSV files into a given directory. It can be easily done using a bash script which prints every second a new value:

for LOOP_ID in {1..30}
do
    filename="./sum_input_data/input_table_$LOOP_ID.csv"
    printf "value\n$LOOP_ID\n" >> $filename
    sleep 1
done

With the CSV connector, the addition of a single CSV file triggers a commit (see more about how Pathway' connectors work).

Putting everything together

The final version of your project contains two files: realtime_sum.py which processes the stream using Pathway and generating_stream.sh which generates the streams.

Here is realtime_sum.py:

realtime_sum.py
import pathway as pw

class InputSchema(pw.Schema):
    value: int


t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="streaming"
)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.csv.write(t, "output_stream.csv")
pw.run()

Don't forget to run the computation with pw.run(), otherwise the computation graph will be built but no data will be ingested: there will be no computation. The result is quite compact, considering that the sum is updated in realtime whenever an update is received!

And here is the script generating_stream.sh to create the input stream:

generating_stream.sh
#!/bin/bash
src="./sum_input_data/"
rm -r $src
mkdir -p $src

sleep 10

for LOOP_ID in {1..30}
do
    filename=$src"input_table_$LOOP_ID.csv"
    printf "value\n$LOOP_ID\n" >> $filename
    sleep 1
done

The input connector requires the input CSV directory to exist, so you should launch the script generating the stream first. The input connector will connect to the file and update the results every time a new CSV file is added, updating automatically and in realtime the output CSV file using the output CSV connector.

Interpret the Output

The output of this project is a CSV file output_stream.csv in which all the successive updates of the sum values are displayed:

sum,time,diff
1,1,1
1,2,-1
3,2,1
3,3,-1
6,3,1
...

Let's explain the meaning of those lines. Each row contains:

  • the columns of the output table, here you only have sum.
  • time, which represents the time at which the update has happened. In practice, the time is a timestamp.
  • diff, which represents whether the row represents an addition or a deletion. An update is represented by two rows: one to remove the old value, one to add the new values. Those two rows have the same time to ensure the atomicity of the operation.

Hence, the first line represents the insertion (diff=1) of a new value (sum=1), which has happened at time 1. Then a new CSV file is created containing the new value 2, changing the value from 1 to 3, resulting in both the deletion (diff=-1) of the previous value (sum=1) which has happened at time 2 AND an addition (diff=1) of a new value (sum=3) at the same time (time=2).

If you want more information about connectors and their output, check our article about them.

The output file will be updated until the process is killed.

Note: the updates may not be visible as soon as they are done depending on the way your OS is handling files. In Linux, the changes of this example may be cached into a buffer which may not be committed before the termination of the computation.

Use Static Data to Debug

If you have tried to come up with the Pathway implementation on your own you may have realized that working with streaming data may not be the most convenient way to identify what's wrong with your code. Fortunately, Pathway provides a static mode to work in a simulated "batch" mode in which static and finite data is manipulated. One of the main advantages is to be able to trigger a computation, using the whole data, and to print the results without using an output connector thanks to pw.debug.compute_and_print! If you want to learn more about both modes, read our article about them.

Let's see what the static mode can do to help you!

Small reminder: using the static modes only changes how the data is accessed, i.e. the input/output connectors.

First, you need static data. In our case, the static CSV input connector works the same as its streaming counterpart: you can use the same input as for the live example, but you need to have the generation done before executing your Pathway sample. To simplify things, you can use the following script:

generating_stream.sh
#!/bin/bash
src="./sum_input_data/"
rm -r $src
mkdir -p $src

for LOOP_ID in {1..10}
do
    filename=$src"input_table_$LOOP_ID.csv"
    printf "value\n$LOOP_ID\n" >> $filename
done

It's the same script as before but without the sleep calls, and producing a smaller number of values.

Now, you can check if the data is correctly read by using the CSV input connector and pw.debug.compute_and_print to print the data:

static_sum.py
import pathway as pw

class InputSchema(pw.Schema):
    value: int


# The mode is set to 'static' to use the static version
t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="static"
)
pw.debug.compute_and_print(t)

This should print the table! The extra column is the column id which contains the indexes of the rows.

Now you can check at every step if everything works as expected:

static_sum.py
import pathway as pw

class InputSchema(pw.Schema):
    value: int

t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="static"
)
pw.debug.compute_and_print(t)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.debug.compute_and_print(t)

💡 There is no need for output connector nor pw.run() in the static mode if you use pw.debug.compute_and_print. pw.debug.compute_and_print internally does pw.run(), so the computation will be triggered at each call of pw.debug.compute_and_print. You can also use a static output connector to print the data into a CSV file but in this case do not forget pw.run():

static_sum.py
import pathway as pw

class InputSchema(pw.Schema):
    value: int


t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="static"
)
pw.debug.compute_and_print(t)
t = t.select(value=pw.cast(int, t.value))
pw.debug.compute_and_print(t)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.csv.write(t, "output_stream.csv")

pw.run()

The final value of t is written in the output file.

⚠️ Note that as the computation is triggered for each call to pw.debug.compute_and_print, the previous code sample could be seen as the successive and independent executions of the three following code samples:

import pathway as pw

class InputSchema(pw.Schema):
    value: int


t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="static"
)
pw.debug.compute_and_print(t)
t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="static"
)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.debug.compute_and_print(t)
t = pw.io.csv.read(
  './sum_input_data/',
  schema=InputSchema,
  mode="static"
)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.csv.write(t, "output_stream.csv")

When the code gets complex the computation can become very long: keep the data sample small and limit as much as possible the calls to pw.debug.compute_and_print.

Tests

You may want to build tests to be sure your application is correct. Pathway is a Python framework and thus compatible with standard python frameworks. You can use your favorite python testing framework such as pytest!

Production code deployment

Now that your application has been debugged and tested, you may want to test it for real. In practice, since Pathway is a Python framework, you can follow the usual process.

Single machine using multiprocessing

While being a requirement for efficiency, doing multiprocessing and multithreading with Python can be exhausting. Pathway natively proposes multiprocessing and multithreading: no extra library nor GIL! You only need to use the command spawn and specify the number of processes and the number of threads by process you want using the --processes (-n) and the --threads (-t) options! To launch your application with 2 processes, having 3 threads each, you can do as follows:

pathway spawn --processes 2 --threads 3 python realtime_sum.py

Distributed processing

It is also possible to use Pathway using distributed processing on more than one machine. However, this feature is only available using Pathway for Enterprise. To see how it works, you can take a look at our dedicated example (coming soon).

To go further

Congrats, you now have all the tools to make realtime data processing a reality using Pathway!

Doing a sum was a bit too simple right? Why don't you try something more challenging and test this example on how to build a linear regression on data sent to Kafka?!

If you want to learn more about Pathway, you can read our tutorials about the key concepts behind Pathway, the differences between the streaming mode and the static mode, or the First-steps guide of Pathway.