Best Practices for Efficient Pathway Pipelines

To make the most of Pathway, follow these best practices to optimize the performance and the memory usage of your streaming pipelines.

Use built-in transformations instead of Python UDFs

Pathway provides a rich set of built-in transformations that are optimized for performance. Whenever possible, use these instead of Python user-defined functions (UDFs): built-in operations are directly performed in Rust and thus more efficient than the Python UDFs.

Annotate your UDFs

If you do need to use UDFs, always annotate them with the expected input and output types. This helps Pathway determine the result type in advance, optimizing execution and preventing unnecessary type inference overhead.

@pw.udf
def add_one(x: int) -> int:
    return x + 1

Mark deterministic UDFs

If your UDF produces the same output given the same input (i.e., it is deterministic), mark it as such. This allows Pathway not to store results, thus reducing memory usage.

@pw.udf(deterministic=True)
def normalize_value(x: float) -> float:
    return x / 100.0

You can learn more about determinism in UDFs here.

Favor stateless transformations

Since Pathway operates on incremental data updates, prefer stateless transformations when possible. Stateful operations should be carefully designed to avoid unnecessary memory usage.

Use windowing strategies for aggregations

For aggregation tasks, consider windowing strategies like tumbling or sliding windows to control memory usage efficiently.

agg_table = table.window_by(pw.temporal.tumbling_window(duration=60)).reduce(pw.this.value.sum())

Manage memory usage with temporal behaviors

To limit memory usage in windows and temporal joins, make use of Pathway's temporal behaviors. This helps prevent unbounded memory growth by defining how long data should be kept.

windowed_table = table.windowby(
    window=pw.temporal.sliding(duration=10, hop=4),
    behavior=pw.temporal.common_behavior(cutoff=60),
).reduce(
    window_start=pw.this._pw_window_start,
    window_end=pw.this._pw_window_end,
    n_logs=pw.reducers.count(),
)

Using the cutoff parameter, this behavior prevents the sliding windows to be updated after 60 seconds. It this case, it means that at most 6 windows can be updated at the same time.

Use asof_now_join when possible

Joins in Pathway store both tables in memory, which can lead to high memory consumption. If you only need to join entries from one table with the current state of another, use asof_now_join, which stores only one side of the join.

result = table1.asof_now_join(table2, table1.key == table2.key)

Optimize for low latency with multiprocessing

If you experience high latency due to heavy UDFs, consider using multiprocessing to parallelize execution. You can start your Pathway pipeline with multiple worker processes using (this specific command starts 8 pathway workers):

pathway spawn -n 8 python main.py

This approach helps distribute the workload efficiently across multiple CPU cores, improving throughput and reducing latency.

Retrieve column names and data types with schema

You can retrieve your table's schema to understand its column names and data types using the schema property. This helps when debugging or designing transformations.

print(table.schema)

You can also use table.schema.generate_class() to get a schema definition pastable back to your code. Wrapping Pathway transformations in functions and annotating arguments and results with table: pw.Table[YourSchema] boosts code readability. Decorating such functions with @pw.table_transformer provides additional automated type-checking.

class MySchema(pw.Schema):
    column1: int
    column2: str

@pw.table_transformer
def process_table(table: pw.Table[MySchema]) -> pw.Table[MySchema]:
    return table.with_columns(column1=pw.this.column1 + 1)

Start with static batch processing

Write your main pipeline using static batch mode first before switching to streaming. Use tools like pw.debug.table_from_markdown and pw.debug.table_to_dicts to develop and test your transformations. Structure your code so that transformations are separate from pw.io, allowing for better maintainability and automated testing.

def transform_data(table: pw.Table) -> pw.Table:
    return table.with_columns(new_col=pw.this.old_col * 2)

static_table = pw.debug.table_from_markdown("""
| old_col |
|--------|
| 1      |
| 2      |
""")

result = transform_data(static_table)
print(pw.debug.table_to_dicts(results))

Don't mix pw.debug.compute_and_print with streaming sources

The function pw.debug.compute_and_print works only with static mode. By default, input connectors are in streaming mode, so if you try to use this function with a streaming source, you won't get any output. To use compute_and_print, set mode="static" explicitly:

static_table = pw.io.read_csv("data.csv", mode="static")
static_table.debug.compute_and_print()

Monolith or multiple pipelines?

When running multiple independent pipelines on a single node, both a monolithic approach (one pw.run with multiple pipelines) and a distributed approach (separate scripts with multiple pw.run instances) have their merits.

  • Multiple pipelines approach:
    • More flexibility in managing, debugging, and upgrading individual pipelines.
    • Avoids restarting an entire system for minor changes.
  • Single monolithic pipeline approach:
    • Easier to monitor.
    • Reduces redundant work by input source connectors.
    • Ensures better data consistency between tables in the same pipeline.

If pipelines need to communicate, consider using Kafka-compatible solutions or S3 with Delta Table format for efficient data exchange. If you later need to switch approaches, the refactoring process is usually straightforward but may require DevOps adjustments for monitoring tools like OpenTelemetry and Grafana.