Your first real-time pipeline: a step-by-step guide

In this article, you will learn how to start your journey with Pathway.

To build your first real-time pipeline with Pathway, you need to follow these steps:

  1. Install with pip
  2. Import Pathway
  3. Connect to you data sources with input connectors
  4. Create your pipeline
  5. Send the results with output connectors
  6. Run your pipeline

Step 1: Install with pip

You can download the current Pathway release, which is now available on a free-to-use license:

            pip install -U pathway
        

on a Python 3.10+ installation, and you are ready to roll!

⚠️ Pathway is available on MacOS and Linux. Pathway is currently not supported on Windows. Windows users may want to use Windows Subsystem for Linux (WSL), docker, or a VM.

You can also try these steps in an online notebook environment like Colab.

To jumpstart a Pathway project, quickly check our cookiecutter template.

Step 2: import Pathway

To use Pathway, you need to import it:

import pathway as pw

Step 3: connect to your data

Before building your pipeline, you need to connect to your data sources by using input connectors. Input connectors pull the data from the data sources to the Pathway tables.

Pathway provides a wide range of input connectors. For example, you can easily connect to Kafka using Pathway Kafka connector:

class ExampleSchema:
    value: int


input_table = pw.io.kafka.read(
    rdkafka_settings,
    topic="topic_name",
    format="json",
    schema=ExampleSchema
)

💡 To connect to a data source, you need to specify the schema of the incoming data.

You can learn more about the connectors and see the full list of input connectors here.

If you don't have any data source ready but still want to try out Pathway, you can use the pw.demo library to generate an artificial data source.

Step 4: Create your pipeline

Now that you have data, you can process it as you want! Joins, temporal windows, filtering... You can have a glimpse of the available operations in our basic operations guide.

For simplicity, let's start with a simple sum:

sum_table = input_table.reduce(sum=pw.reducers.sum(input_table.value))

The sum_table contains a single column sum with a single entry that is the sum of all the values in the table input_table. This value is automatically updated by Pathway when new values are added to the input_table table.

Step 5: Output your results

Now that your pipeline is ready, you need to specify what to do with the results. Pathway uses output connectors to output the data outside of Pathway.

Let's output the results in a new CSV file by using the pw.io.csv connector.

pw.io.csv.write(sum_table, "output_stream.csv")

Step 6: Run your pipeline

Now that everything is ready, you can easily run while letting Pathway handle the updates. To launch the computation on streaming data you need to add pw.run():

pw.run()

Don't forget to run the computation with pw.run(), otherwise the pipeline is built, but no data is ingested so no computation happens.

That's it!

With pw.run(), the computation is launched. Each update in the input data streams will automatically trigger the update of the whole pipeline. Pathway will poll for new updates until the process is terminated: the computation runs forever until the process gets killed. This is the normal behavior of Pathway.

If you want to test your pipeline on static and finite data, Pathway also provides a static mode. You can learn more about both modes in our dedicated article.

Understanding the output

Suppose the previous example of a simple sum on the values received on the Kafka topic with the following input:

{"value":1}
{"value":2}

We assume the first value was received at time 1, and the second at time 2. Here is the resulting CSV file output_stream.csv in which all the successive updates of the sum values are displayed:

sum,time,diff
1,1,1
1,2,-1
3,2,1

Let's explain the meaning of those lines. Each row contains:

  • the columns of the output table, here you only have sum.
  • time, which represents the time at which the update has happened. In practice, the time is a timestamp.
  • diff, which represents whether the row represents an addition or a deletion. An update is represented by two rows: one to remove the old value, one to add the new values. Those two rows have the same time to ensure the atomicity of the operation.

Hence, the first line represents the insertion (diff=1) of a new value (sum=0), which has happened at time 1. Then a value is added containing the new value 1, changing the value from 0 to 1, resulting in both the deletion (diff=-1) of the previous value (sum=0) which has happened at time 1 AND an addition (diff=1) of a new value (sum=1) at the same time (time=2).

Bonus: using the CLI

In addition to python my_script, you can use Pathway CLI to launch your computation:

pathway spawn python realtime_sum.py

With the CLI, you can use more advanced features such as multiprocessing.

Single machine using multiprocessing

While being a requirement for efficiency, doing multiprocessing and multithreading with Python can be tedious. Pathway natively provides multiprocessing and multithreading: no extra library nor GIL! You only need to use the CLI and specify the number of processes and the number of threads per process by using the --processes (-n) and the --threads (-t) options! To launch your application with 2 processes, having 3 threads each, you can do as follows:

pathway spawn --processes 2 --threads 3 python realtime_sum.py

Next steps

Congratulations, you are now ready to do more advanced pipelines. Why not try to connect to CSV files using the CSV connector or do a linear regression on Kafka? You can also take a look at Pathway basic operations guide, describing the elements of Pathway you are most likely to use from day one. If you want to learn more about how Pathway works, you can read our article about Pathway concepts. If you encounter an issue, don't hesitate to check-out our get-help page.