Best Practices for Efficient Pathway Pipelines
To make the most of Pathway, follow these best practices to optimize the performance and the memory usage of your streaming pipelines.
- Use built-in transformations instead of Python UDFs
- Annotate your UDFs
- Mark deterministic UDFs
- Favor stateless transformations
- Use windowing strategies for aggregations
- Manage memory usage with temporal behaviors
- Use asof_now_join when possible
- Optimize for low latency with multiprocessing
- Retrieve column names and data types with schema
- Start with static batch processing
- Don't mix pw.debug.compute_and_print with streaming sources
- Monolith or multiple pipelines?
Use built-in transformations instead of Python UDFs
Pathway provides a rich set of built-in transformations that are optimized for performance. Whenever possible, use these instead of Python user-defined functions (UDFs): built-in operations are directly performed in Rust and thus more efficient than the Python UDFs.
Annotate your UDFs
If you do need to use UDFs, always annotate them with the expected input and output types. This helps Pathway determine the result type in advance, optimizing execution and preventing unnecessary type inference overhead.
@pw.udf
def add_one(x: int) -> int:
return x + 1
Mark deterministic UDFs
If your UDF produces the same output given the same input (i.e., it is deterministic), mark it as such. This allows Pathway not to store results, thus reducing memory usage.
@pw.udf(deterministic=True)
def normalize_value(x: float) -> float:
return x / 100.0
You can learn more about determinism in UDFs here.
Favor stateless transformations
Since Pathway operates on incremental data updates, prefer stateless transformations when possible. Stateful operations should be carefully designed to avoid unnecessary memory usage.
Use windowing strategies for aggregations
For aggregation tasks, consider windowing strategies like tumbling or sliding windows to control memory usage efficiently.
agg_table = table.window_by(pw.temporal.tumbling_window(duration=60)).reduce(pw.this.value.sum())
Manage memory usage with temporal behaviors
To limit memory usage in windows and temporal joins, make use of Pathway's temporal behaviors. This helps prevent unbounded memory growth by defining how long data should be kept.
windowed_table = table.windowby(
window=pw.temporal.sliding(duration=10, hop=4),
behavior=pw.temporal.common_behavior(cutoff=60),
).reduce(
window_start=pw.this._pw_window_start,
window_end=pw.this._pw_window_end,
n_logs=pw.reducers.count(),
)
Using the cutoff
parameter, this behavior prevents the sliding windows to be updated after 60 seconds.
It this case, it means that at most 6 windows can be updated at the same time.
Use asof_now_join
when possible
Joins in Pathway store both tables in memory, which can lead to high memory consumption.
If you only need to join entries from one table with the current state of another, use asof_now_join
, which stores only one side of the join.
result = table1.asof_now_join(table2, table1.key == table2.key)
Optimize for low latency with multiprocessing
If you experience high latency due to heavy UDFs, consider using multiprocessing to parallelize execution. You can start your Pathway pipeline with multiple worker processes using (this specific command starts 8 pathway workers):
pathway spawn -n 8 python main.py
This approach helps distribute the workload efficiently across multiple CPU cores, improving throughput and reducing latency.
Retrieve column names and data types with schema
You can retrieve your table's schema to understand its column names and data types using the schema
property.
This helps when debugging or designing transformations.
print(table.schema)
You can also use table.schema.generate_class()
to get a schema definition pastable back to your code.
Wrapping Pathway transformations in functions and annotating arguments and results with table: pw.Table[YourSchema]
boosts code readability.
Decorating such functions with @pw.table_transformer
provides additional automated type-checking.
class MySchema(pw.Schema):
column1: int
column2: str
@pw.table_transformer
def process_table(table: pw.Table[MySchema]) -> pw.Table[MySchema]:
return table.with_columns(column1=pw.this.column1 + 1)
Start with static batch processing
Write your main pipeline using static batch mode first before switching to streaming.
Use tools like pw.debug.table_from_markdown
and pw.debug.table_to_dicts
to develop and test your transformations.
Structure your code so that transformations are separate from pw.io
, allowing for better maintainability and automated testing.
def transform_data(table: pw.Table) -> pw.Table:
return table.with_columns(new_col=pw.this.old_col * 2)
static_table = pw.debug.table_from_markdown("""
| old_col |
|--------|
| 1 |
| 2 |
""")
result = transform_data(static_table)
print(pw.debug.table_to_dicts(results))
Don't mix pw.debug.compute_and_print
with streaming sources
The function pw.debug.compute_and_print
works only with static mode.
By default, input connectors are in streaming mode, so if you try to use this function with a streaming source, you won't get any output.
To use compute_and_print
, set mode="static"
explicitly:
static_table = pw.io.read_csv("data.csv", mode="static")
static_table.debug.compute_and_print()
Monolith or multiple pipelines?
When running multiple independent pipelines on a single node, both a monolithic approach (one pw.run
with multiple pipelines) and a distributed approach (separate scripts with multiple pw.run
instances) have their merits.
- Multiple pipelines approach:
- More flexibility in managing, debugging, and upgrading individual pipelines.
- Avoids restarting an entire system for minor changes.
- Single monolithic pipeline approach:
- Easier to monitor.
- Reduces redundant work by input source connectors.
- Ensures better data consistency between tables in the same pipeline.
If pipelines need to communicate, consider using Kafka-compatible solutions or S3 with Delta Table format for efficient data exchange. If you later need to switch approaches, the refactoring process is usually straightforward but may require DevOps adjustments for monitoring tools like OpenTelemetry and Grafana.