Dynamic Worker Scaling

Programs built with Pathway allow you to scale workload and split heavy pipelines into parallel units of execution. This enables effective parallelization and higher throughput for computationally intensive workloads.

To run computations in parallel, you can use the pathway spawn command and specify the number of workers to use.

This way:

  • Each worker executes in parallel.
  • Workers can be implemented either as processes or as threads.
  • By increasing the number of workers, you can split computations into more parallel parts and achieve better throughput.

When you start your program this way, it will run continuously with exactly N workers for its entire lifetime. It helps in the set-ups with a consistent load profile, but has certain limitations if the load profile is volatile: if the system load is low, resources may be underutilized, while if the load increases, the fixed number of workers may become a bottleneck. In other words, the worker count doesn't adapt automatically and you have a static execution model.

In many real-world scenarios, it is desirable for the number of workers to be dynamic: under low load, the number of active workers should decrease to save resources, while under high load, the number of workers should increase proportionally to handle the demand. To support this use case, Pathway provides built-in autoscaling mechanisms that allow worker counts to grow and shrink automatically based on workload.

How Dynamic Scaling Works

Understanding the basics of the scaling mechanism is useful for setting it up efficiently.

When you launch a Pathway computation via pathway spawn, an orchestrator creates and manages the workers. It receives information from the workers based on which the number of workers can be adjusted.

Each worker, if configured, tracks its load profile. This tracking is based on the time spent on computation over a sliding window (2 minutes by default, configurable by the user), as well as the worker's idle time and the intervals the scheduler expected the computations to take. After the full window interval has passed, patterns with excessive idle time lead to worker termination, where an exit code informs the orchestrator that the engine should be scaled down. Conversely, if computations consistently fall behind, the orchestrator determines that the computation should be restarted with a larger number of workers.

After receiving a signal, the orchestrator adjusts the number of workers and restarts the computation. The sharding and work-splitting mechanisms are updated for the new worker count.

To ensure computations resume from the point reached when the decision to change the number of workers was made, data persistence is required. Scaling can also be configured within the data persistence settings.

Last, but not the least, please note that the described procedure implies a full restart of the computation graph. Persistence mitigates this, but does not eliminate restart costs.

Worker Count Adjustment Rules

As mentioned, the number of workers can be recalculated by the orchestration process. The following rules illustrate how this adjustment works:

  • Increasing workers: The orchestrator doubles the current number of workers.
    • Example: If you have 4 workers and scaling up is required, the system will increase the count to 8.
  • Decreasing workers: The orchestrator halves the current number of workers.
    • Example: If you have 4 workers and scaling down is required, the system will reduce the count to 2.

In any case, you can't have less than one worker. Therefore, even if the pipeline is very light and only one worker is running, the orchestrator cannot reduce the number further.

The scaling process scales only by increasing or decreasing the number of processes. Threads are not used for dynamic scaling in this mechanism. This way, if your initial configuration uses thread workers or uses both, threads and processes, the scaling will only change the process number. For example, if you launched the computation with one process, containing two workers, the upscaling will lead to two processes, having two workers each. On the other hand, downscaling from the initial configuration in this case won't be possible, since the number of processes is already equal to one.

License Limitations

You need a Pathway License in order for the scaling to work. You can obtain your free Pathway Scale license here. The page contains instructions for getting the license and using it in the pipeline.

Please note that the free scaling license has a constraint on the maximum worker count: no more than 8. This way, if the free tier is used, once the system uses 8 workers, it won't scale up, even if the computation falls behind. Note also that if the system scales up, the number of workers after scaling in the free tier can't exceed 8.

Configuring and Running

With these restrictions in mind, you are ready to configure and run auto-scaling.

First, you need to create a persistence configuration to preserve the computation state and progress between worker restarts, as without it, once restarted, the computation will commence from the beginning. A simple version would look as follows:

from pathway.internals import api

persistence_config = pw.persistence.Config(
    backend=pw.persistence.Backend.filesystem(your_persistent_storage_path),  # e.g., /tmp/Pathway-Cache
    persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
)

Please note the persistence_mode parameter: in high-load scenarios, it is crucial to use OPERATOR_PERSISTING. This mode allows the system to dump only the state of internal computation structures, avoiding heavy recomputations that may occur if upscaling becomes necessary.

However, this configuration does not yet include scaling settings; in this form, scaling remains disabled. You need to enable it by toggling the worker_scaling_enabled flag:

from pathway.internals import api

persistence_config = pw.persistence.Config(
    backend=pw.persistence.Backend.filesystem(your_persistent_storage_path),  # e.g., /tmp/Pathway-Cache
    persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
    worker_scaling_enabled=True,
)

With this setting, the program will track the workload and be capable of scaling up and down. By default, statistics are computed over a two-minute window. You can change this by specifying the number of milliseconds in the workload_tracking_window_ms parameter:

from pathway.internals import api

persistence_config = pw.persistence.Config(
    backend=pw.persistence.Backend.filesystem(your_persistent_storage_path),  # e.g., /tmp/Pathway-Cache
    persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
    worker_scaling_enabled=True,
    workload_tracking_window_ms=300000,  # 5 minutes
)

Keep in mind that this value should not be too small. At startup, data sources may not kick off immediately, taking several seconds to begin providing data. During these initial seconds, the graph will be underloaded because there is no computation to perform without input. Therefore, ensure the window allows for this initial startup duration and is at least 20-30 seconds long.

Once you have configured the persistence settings, pass the object as the persistence_config parameter in the pw.run method:

pw.run(persistence_config=persistence_config)

Finally, you can spawn the execution using a console command, for example: pathway spawn -n 2 python pipeline.py. This command starts the pipeline, having initially two workers, each one being a process.

Conclusion

To manage a dynamic number of workers, follow these steps:

  1. Configure persistence
    It is strongly recommended to use operator persistence (OPERATOR_PERSISTING) to ensure computation state is safely stored between worker restarts and the restarts are as fast as possible.
  2. Enable worker scaling
    In your persistence configuration, set the worker_scaling_enabled flag. By default, scaling is disabled.
  3. Adjust the workload tracking window
    Set the appropriate workload_tracking_window_ms to control how the orchestrator evaluates workload patterns or leave the default, which is two minutes.
  4. Start the computation
    Launch your pipeline with pathway spawn. You can also specify the initial number of workers at startup:
    pathway spawn -n <initial_worker_count> python pipeline.py
    

If you have any questions, feel free to reach out on Discord or open an issue on our GitHub.