pw.asynchronous

Helper methods and classes used along with udf_async() and AsyncTransformer().

Typical use:

import pathway as pw
import asyncio
@pw.udf_async(retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(max_retries=5))
async def concat(left: str, right: str) -> str:
  await asyncio.sleep(0.1)
  return left+right
t1 = pw.debug.table_from_markdown('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = concat(t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

class pw.asynchronous.AsyncRetryStrategy

[source]
Class representing strategy of delays or backoffs for the retries.

class pw.asynchronous.CacheStrategy

[source]
Base class used to represent caching strategy.

class pw.asynchronous.DefaultCache(name=None)

[source]
The default caching strategy. Persistence layer will be used if enabled. Otherwise, cache will be disabled.

class pw.asynchronous.ExponentialBackoffRetryStrategy(max_retries=3, initial_delay=1000, backoff_factor=2, jitter_ms=300)

[source]
Retry strategy with exponential backoff with jitter and maximum retries.

class pw.asynchronous.FixedDelayRetryStrategy(max_retries=3, delay_ms=1000)

[source]
Retry strategy with fixed delay and maximum retries.

class pw.asynchronous.NoRetryStrategy

[source]

pw.asynchronous.async_options(capacity=None, retry_strategy=None, cache_strategy=None)

sourceDecorator applying async options to a provided function. Regular function will be wrapper to run in async executor.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations. Defaults to None, indicating no specific limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. If set to None and a persistency is enabled, operations will be cached using the persistence layer. Defaults to None.
  • Returns
    Coroutine

pw.asynchronous.coerce_async(func)

sourceWraps a regular function to be executed in async executor. It acts as a noop if the provided function is already a coroutine.

pw.asynchronous.with_cache_strategy(func, cache_strategy)

sourceReturns an asynchronous function with applied cache strategy. Regular function will be wrapper to run in async executor.

  • Parameters
    cache_strategy (CacheStrategy) – Defines the caching mechanism.
  • Returns
    Coroutine

pw.asynchronous.with_capacity(func, capacity)

sourceLimits the number of simultaneous calls of the specified function. Regular function will be wrapper to run in async executor.

  • Parameters
    capacity (int) – Maximum number of concurrent operations.
  • Returns
    Coroutine

pw.asynchronous.with_retry_strategy(func, retry_strategy)

sourceReturns an asynchronous function with applied retry strategy. Regular function will be wrapper to run in async executor.

  • Parameters
    retry_strategy (AsyncRetryStrategy) – Defines how failures will be handled.
  • Returns
    Coroutine