Switching from Batch to Streaming

Easily switching from batch to streaming is a core feature of Pathway. In this article, you will see how easy it is to change your static pipeline to make it run with streaming data.

You made your pipeline on static data. Maybe it was because it was simpler to develop or to test. Or maybe it was because you came from Pandas.

Anyway, your pipeline is ready, tested, and validated. Now you want to go one step further and make it run in realtime.

Good news for you, Pathway makes it extremely easy as it is a unified batch and streaming batch processing framework.

In a nutshell, the only thing you need to do is to change your data sources to connect to your streaming data sources, and Pathway will take care of the rest!

Batch to Streaming Code in Pathway

Starting with static code

Let's start with writing some code working on static, batch data. It's easier as it allows you to focus on your code logic without worrying about time and consistency. You can also test on static data to make sure the pipeline works as intended.

Let's write a simple example that reads CSV files, sums all the values, and then outputs the results in a new CSV file.

import pathway as pw

# WRITE SOME STATIC CODE

# read data
class InputSchema(pw.Schema):
    value: int

t = pw.io.csv.read(
    './sum_input_data/',
    schema=InputSchema,
    mode="static",
)

# process data
t = t.reduce(pw.reducers.sum(t.value))

# write data
pw.io.csv.write(t, "output.csv")

#run
pw.run()

You can try this code sample, you will only need CSV files with a single column value.

Simply change the connectors

Now, you would like to be able to add CSV files and have the sum automatically updated: you want to go from a static data source to a streaming one.

In Pathway, this change is very easy: you simply need to change the connectors to connect to your streaming data sources.

Not all the connectors are working in both static and streaming modes. You can take a look at the list of connectors and pick the best-suited ones for your project. You can learn how to use them in the Input/Output API.

The CSV input connector used in our example works in both modes: you only need to change the mode by setting the parameter mode to streaming:

t = pw.io.csv.read(
    './sum_input_data/',
    schema=InputSchema,
    mode="streaming",
)

And that's it!

The rest of the implementation remains unchanged:

import pathway as pw

# WRITE SOME STREAMING CODE

# read data
class InputSchema(pw.Schema):
    value: int

t = pw.io.csv.read(
    './sum_input_data/',
    schema=InputSchema,
    mode="streaming",
)

# process data
t = t.reduce(pw.reducers.sum(t.value))

# write data
pw.io.csv.write(t, "output.csv")

#run
pw.run()

What else?

Your former static project is now a realtime data processing one, congratulations!

But what does it change for you in practice?

Not much: Pathway will handle everything for you. You don't have to worry about the new temporal nature of your project: Pathway will manage late and out-of-order data points for you.

In a streaming system, inputs are coming as a never-ending data stream, so Pathway computes an output with the available data and then revises the result whenever new data arrives. This is the only real change when switching to streaming: now the output is not a static result but a data stream. Whenever a new update, a new CSV file in our example, is coming, the new values are ingested by Pathway and the results are automatically updated. You can take a look at our first-steps article to understand the new results.

You can run with an input stream generated using pw.demo.range_stream.

Conclusion

As you can see, Pathway makes the transition between batch to streaming as easy as possible. Using Pathway, you can now focus on building your pipeline on static data, without worrying about the temporal nature of your data. Once ready, simply change your connectors to connect to your live data sources and let Pathway manage everything for you.