User-defined functions (UDFs) in Pathway

Pathway supports a wide range of expressions that allow you to operate on individual rows. However, not all operations can be expressed that way. To address this problem, Pathway allows you to write a user-defined function (UDF) in Python. Such function is then applied to each row of your data individually in the same way as the predefined expressions. UDFs can be customized in various ways and all of them are explored in this guide.

Simple UDFs

In the beginning, let's consider a simple case. You want to write a function that increments a number by 1. Just write a regular function and decorate it with pw.udf.

import pathway as pw


@pw.udf
def inc(x: int) -> int:
    return x + 1

and that's everything you need. Now you can use it as an ordinary Pathway expression, as in the example shown below.

table = pw.debug.table_from_markdown(
    """
    value
      1
      2
     13
"""
)

result = table.with_columns(value_inc=inc(table.value))
pw.debug.compute_and_print(result)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | value | value_inc
^X1MXHYY... | 1     | 2
^YYY4HAB... | 2     | 3
^Z3QWT29... | 13    | 14

It works! The printed table contains two columns - value and value_inc with the result of inc function.

Note that we annotated the return type of inc (int in this case). It is important information for pathway as it is used to infer the type of value_inc column. Let's make sure that the return type is correct by printing the schema of the result table.

print(result.schema)
id          | value | value_inc
ANY_POINTER | INT   | INT      

It is correct. The return type can also be set in a decorator:

@pw.udf(return_type=int)
def inc_2(x: int):
    return x + 1


result_2 = table.with_columns(value_inc=inc_2(table.value))
pw.debug.compute_and_print(result_2)
print(result_2.schema)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | value | value_inc
^X1MXHYY... | 1     | 2
^YYY4HAB... | 2     | 3
^Z3QWT29... | 13    | 14
id          | value | value_inc
ANY_POINTER | INT   | INT      

In this case, it is also set correctly. If a UDF is not annotated and the return_type is not set, Pathway can't determine the return type of the column and sets it as Any. It is an undesirable situation as many expressions can't be called on columns of type Any. For example, you can't add a column with type Any to a column of type int (you also can't add Any to Any), but you can add a column of type int to a column of type int.

@pw.udf
def inc_3(x: int):
    return x + 1


result_3 = table.with_columns(value_inc=inc_3(table.value))
pw.debug.compute_and_print(result_3)
print(result_3.schema)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | value | value_inc
^X1MXHYY... | 1     | 2
^YYY4HAB... | 2     | 3
^Z3QWT29... | 13    | 14
id          | value | value_inc
ANY_POINTER | INT   | ANY      

As you can see, this time the type of value_inc column is Any.

Python functions can also be called on data by using pw.apply/pw.apply_with_type functions.

result_4 = table.with_columns(
    value_inc=pw.apply_with_type(lambda x: x + 1, int, table.value)
)
pw.debug.compute_and_print(result_4)
print(result_4.schema)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | value | value_inc
^X1MXHYY... | 1     | 2
^YYY4HAB... | 2     | 3
^Z3QWT29... | 13    | 14
id          | value | value_inc
ANY_POINTER | INT   | INT      

Remark: to keep the examples as simple as possible, the code pieces in this guide use table_from_markdown to define the example tables and compute_and_print to run the computations. Those functions use Pathway in the static mode. However, Pathway is a streaming data processing system and can work on dynamically changing data. See Pathway modes for more info on this topic.

Also note that the inc function is only present in this guide for demonstration purposes. It is possible to get the same result using Pathway native operations and this is the recommended way as then the computations are performed in Rust, not Python.

The UDFs are useful for more complicated solutions that cannot be fully expressed in Pathway but the functions in the guide are kept simple to focus on UDFs usage and configuration options.

result_5 = table.with_columns(value_inc=table.value + 1)
pw.debug.compute_and_print(result_5)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | value | value_inc
^X1MXHYY... | 1     | 2
^YYY4HAB... | 2     | 3
^Z3QWT29... | 13    | 14

Calling library functions

The UDF mechanism allows you to also call external functions. You can, for example, use scipy to compute quantiles of the normal distribution.

from scipy.stats import norm

table = pw.debug.table_from_markdown(
    """
    q
    0.02
    0.5
    0.84
    0.99
"""
)
quantiles = table.with_columns(value=pw.apply_with_type(norm.ppf, float, table.q))
pw.debug.compute_and_print(quantiles)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | q    | value
^X1MXHYY... | 0.02 | -2.053748910631823
^YYY4HAB... | 0.5  | 0.0
^Z3QWT29... | 0.84 | 0.994457883209753
^3CZ78B4... | 0.99 | 2.3263478740408408

Using norm.ppf is convenient because it is a function, not a method. To use an external method it is better to create a wrapper. Below you can find an example of such a wrapper that extracts the pandas.Timestamp.is_leap_year property and puts its value to the column leap_year.

import pandas as pd

table = pw.debug.table_from_markdown(
    """
    date_str
    2023-01-01T12:00:00
    2024-01-01T12:00:00
    2025-01-01T12:00:00
"""
).select(date_time=pw.this.date_str.dt.strptime("%Y-%m-%dT%H:%M:%S"))


@pw.udf
def is_leap_year(date_time: pd.Timestamp) -> bool:
    return date_time.is_leap_year


result = table.with_columns(leap_year=is_leap_year(pw.this.date_time))
pw.debug.compute_and_print(result)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | date_time           | leap_year
^X1MXHYY... | 2023-01-01 12:00:00 | False
^YYY4HAB... | 2024-01-01 12:00:00 | True
^Z3QWT29... | 2025-01-01 12:00:00 | False

Propagating Nones

By default, Pathway UDFs are called on all rows, however it may not always be desired. In particular, if you have a function that requires values to be non-optional but your data has some missing entries, you may want to return None immediately instead of calling a function. In Pathway, you can enable such mechanism with the propagate_none parameter of pw.udf. By default, it is set to False. Setting it to True makes Pathway to look at the inputs of the UDF, and if at least one of the arguments is None, then the function is not called, and None is returned instead.

table = pw.debug.table_from_markdown(
    """
     a |  b
    12 |  3
     3 |
    23 | 42
       | 12
"""
)


@pw.udf(propagate_none=True)
def mul(a: int, b: int) -> int:
    return a * b


result = table.with_columns(c=mul(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


            | a  | b  | c
^3CZ78B4... |    | 12 |
^YYY4HAB... | 3  |    |
^X1MXHYY... | 12 | 3  | 36
^Z3QWT29... | 23 | 42 | 966

If propagate_none was not set, the mul function would error.

Determinism

Pathway assumes that a UDF is not deterministic unless told otherwise. In this context, being deterministic means that the function always returns the same value for the same arguments. Pathway requires this information for consistency reasons. If you're sure that your function is deterministic, you can set deterministic=True as it usually improves the speed and memory requirements of the computation. However sometimes the function may be nondeterministic in a non-obvious way. For example, some linear algebra operations on floating point numbers that use multithreading under the hood can return slightly different results across runs. Such functions are not deterministic. If this explanation is enough for you, feel free to skip to the next section. If you want to learn more about how Pathway handles non-deterministic functions, dive in.

To maintain consistency, it'll memoize the result of a UDF call until the corresponding input row is deleted. Being able to produce deleting rows is the reason why Pathway has to store the results of UDFs. The values in the inserted and deleted entries have to be the same so that they can cancel out. If a UDF is non-deterministic, it can produce a different value and the entries can't cancel out as they are not equal. To get the same values at row deletion as at insertion, the results have to be remembered. When the row is deleted, there is no more need to remember the result of the call and it can be discarded.

When you're sure that the function is deterministic, you can avoid storing the results by setting deterministic=True in the pw.udf decorator. It'll usually improve the speed and memory requirements of the computation (especially for fast functions). It is recommended to set it always when the function is deterministic.

If the function is slow, setting deterministic=False might result in a faster execution, but it's not recommended if the function is deterministic. It's better to use caching. Caching can help with slow functions even if you call the function with each argument only once. It because Pathway has to evaluate the function also on deletion and when it is cached, the value can be taken from cache instead of evaluating it.

Let's see the effects of deterministic parameter in practice. To do that, let's simulate a stream. It contains special columns: id that sets the id of the row (a deletion has to have the same id as the insertion it removes), __time__ that simulates the arrival time to the engine and __diff__ that tells whether the entry is an insertion () or deletion (). At time two rows are inserted. At time one row is upserted (with an upsert represented as a deletion and insertion with the same time and id).

table = pw.debug.table_from_markdown(
    """
    id | a | b | __time__ | __diff__
     1 | 2 | 3 |     2    |     1
     2 | 4 | 1 |     2    |     1
     1 | 2 | 3 |     4    |    -1
     1 | 3 | 3 |     4    |     1
"""
)

You apply a UDF with default parameters (deterministic=False) first.

@pw.udf
def add(a: int, b: int) -> int:
    print(f"add({a}, {b})")
    return a + b


result_default = table.select(pw.this.a, pw.this.b, c=add(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_default)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


add(2, 3)
add(4, 1)
add(3, 3)
            | a | b | c
^YYY4HAB... | 3 | 3 | 6
^Z3QWT29... | 4 | 1 | 5

As you can see from the printed messages, the function is called three times. It is because the function was not called on deletion.

This time, let's tell Pathway that the function is deterministic.

@pw.udf(deterministic=True)
def add_deterministic(a: int, b: int) -> int:
    print(f"add_deterministic({a}, {b})")
    return a + b


result_default = table.select(
    pw.this.a, pw.this.b, c=add_deterministic(pw.this.a, pw.this.b)
)
pw.debug.compute_and_print(result_default)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


add_deterministic(2, 3)
add_deterministic(2, 3)
add_deterministic(3, 3)
add_deterministic(4, 1)
            | a | b | c
^YYY4HAB... | 3 | 3 | 6
^Z3QWT29... | 4 | 1 | 5

This time, the function was called four times (once for each entry) and there was no need to remember anything! The function is truly deterministic and the result is consistent.

UDFs should not be used for side effects

UDFs are Python functions so you can capture non-local variables and modify them inside the functions. From the UDF it is also possible to call external services and modify their state. This is, however, strongly discouraged. There's no guarantee that Pathway will run a UDF exactly once for each row. If the function is non-deterministic it might not always be called (see above). Also if caching is set, the functions will be called less frequently.

Note that we sometimes produce side effects in this tutorial by using the print function. However, we use it to show the behavior of the system, not to use the printed messages in some other computation.

If you want to produce side effects, pw.io.subscribe should be used instead.

Caching

If the function you call is expensive and you call it frequently, you may want to cache its results. To do this, you can set cache_strategy in pw.udf decorator. Currently, the supported caching strategies are DiskCache and InMemoryCache. The DiskCache requires the persistence to be enabled. It caches the results in the persistent storage. As a consequence, the results can be reused after the program restart. The InMemoryStorage caches results in memory. As a result, it does not need persistence config but the results are not available after the computations restart.

Let's first run the example without caching:


table = pw.debug.table_from_markdown(
    """
    value | __time__
      1   |     2
      2   |     2
     13   |     2
      1   |     2
      2   |     2
      1   |     2
"""
)


@pw.udf(deterministic=True)
def inc_no_cache(x: int) -> int:
    print(f"inc({x})")
    return x + 1


result_no_cache = table.with_columns(value_inc=inc_no_cache(table.value))
pw.debug.compute_and_print(result_no_cache)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


inc(2)
inc(13)
inc(1)
inc(1)
inc(2)
inc(1)
            | value | value_inc
^X1MXHYY... | 1     | 2
^3CZ78B4... | 1     | 2
^3S2X6B2... | 1     | 2
^YYY4HAB... | 2     | 3
^3HN31E1... | 2     | 3
^Z3QWT29... | 13    | 14

As you can see from printed messages, the UDF was called 6 times.

Let's use InMemoryCache this time:

@pw.udf(deterministic=True, cache_strategy=pw.udfs.InMemoryCache())
def inc_in_memory_cache(x: int) -> int:
    print(f"inc({x})")
    return x + 1


result_in_memory_cache = table.with_columns(value_inc=inc_in_memory_cache(table.value))
pw.debug.compute_and_print(result_in_memory_cache)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


inc(2)
inc(13)
inc(1)
            | value | value_inc
^X1MXHYY... | 1     | 2
^3CZ78B4... | 1     | 2
^3S2X6B2... | 1     | 2
^YYY4HAB... | 2     | 3
^3HN31E1... | 2     | 3
^Z3QWT29... | 13    | 14

This time, the function was called only three times. Other results were extracted from the cache. If you run that piece of code as a separate program, it'd compute the results from scratch at every restart (because the results are stored in memory). In a notebook you have to restart the runtime to see the effect.

This behavior might be problematic if you want to keep the results between restarts. This is where DiskCache comes in. It stores the results of the calls in persistent storage. In the example, it is located in the ./Cache directory. To read more about setting up persistence see the persistence guide.

@pw.udf(deterministic=True, cache_strategy=pw.udfs.DiskCache())
def inc_disk_cache(x: int) -> int:
    print(f"inc({x})")
    return x + 1


persistence_config = pw.persistence.Config.simple_config(
    pw.persistence.Backend.filesystem("./Cache"),
    persistence_mode=pw.PersistenceMode.UDF_CACHING,
)
result_disk_cache = table.with_columns(value_inc=inc_disk_cache(table.value))
pw.debug.compute_and_print(result_disk_cache, persistence_config=persistence_config)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(1) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(1) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(2) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(2) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(3) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(3) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(4) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(4) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(5) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(5) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(6) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(6) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(7) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(7) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(8) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(8) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(9) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(9) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(10) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(10) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(11) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(11) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(12) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(12) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(13) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(13) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(14) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(14) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(15) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(15) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(16) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(16) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(17) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(17) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(18) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(18) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(19) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(19) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(20) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(20) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(21) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(21) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(22) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(22) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(23) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(23) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(24) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(24) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(25) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(25) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(26) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(26) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(27) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(27) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(28) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(28) }


            | value | value_inc
^X1MXHYY... | 1     | 2
^3CZ78B4... | 1     | 2
^3S2X6B2... | 1     | 2
^YYY4HAB... | 2     | 3
^3HN31E1... | 2     | 3
^Z3QWT29... | 13    | 14

If, instead of printing output on the screen, you want to use one of the output connectors, you need to put persistence_config in pw.run, like this:

pw.io.csv.write(result_disk_cache, "result_disk_cache.csv")
pw.run(persistence_config=persistence_config)
Output()


[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(1) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(1) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(2) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(2) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(3) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(3) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(4) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(4) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(5) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(5) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(6) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(6) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(7) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(7) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(8) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(8) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(9) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(9) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(10) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(10) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(11) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(11) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(12) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(12) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(13) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(13) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(14) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(14) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(15) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(15) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(16) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(16) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(17) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(17) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(18) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(18) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(19) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(19) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(20) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(20) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(21) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(21) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(22) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(22) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(23) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(23) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(24) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(24) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(25) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(25) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(26) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(26) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(27) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(27) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(28) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(28) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(29) }


[2024-04-19T19:48:55]:INFO:Merge the current state with block: StoredMetadata { frontiers: OffsetAntichainCollection { antichains: {} }, storage_types: {}, last_advanced_timestamp: Timestamp(29) }


[2024-04-19T19:48:55]:INFO:FileWriter-0: Done writing 6 entries, closing data sink. Current batch writes took: 0 ms. All writes so far took: 0 ms.

Asynchronous UDFs

By default, Pathway UDFs are synchronous and blocking. If one worker is used, only one UDF call is active at a time and it has to finish for the next UDF call to start. If more workers are used, the maximal number of UDFs that have started and haven't finished is equal to the number of workers. It is a good situation for CPU bound tasks. If you want, however, to execute I/O bound tasks, like calling external services, it is better to have more than one task started per worker. Pathway provides asynchronous UDFs for it.

Asynchronous UDFs can be defined in Pathway using Python coroutines with the async/await keywords. The asynchronous UDFs are asynchronous within a single batch. In this context, we define a batch as all entries with equal processing times assigned. The UDFs are started for all entries in the batch and the execution of further batches is blocked until all UDFs for a given batch have finished. Thanks to that, the processing time of the entries remains unchanged and the output remains consistent. If you require a fully asynchronous non-blocking mechanism take a look at AsyncTransformer.

To define an asynchronous UDF it is enough to decorate a coroutine with pw.udf. Let's start with a simple example.

import asyncio


@pw.udf
async def inc_async(x: float) -> float:
    print(f"inc_async({x}) starting")
    await asyncio.sleep(x)
    print(f"inc_async({x}) finishing")
    return x + 1


table = pw.debug.table_from_markdown(
    """
    value
     0.2
     0.6
     2.0
     1.2
"""
)

result = table.select(value=inc_async(pw.this.value))
pw.debug.compute_and_print(result)
[2024-04-19T19:48:55]:INFO:Preparing Pathway computation


inc_async(0.6) starting
inc_async(2.0) starting
inc_async(0.2) starting
inc_async(1.2) starting


inc_async(0.2) finishing




inc_async(0.6) finishing


inc_async(1.2) finishing


inc_async(2.0) finishing
            | value
^X1MXHYY... | 1.2
^YYY4HAB... | 1.6
^3CZ78B4... | 2.2
^Z3QWT29... | 3.0

From the printed messages, you can see that the calls are executed asynchronously.

Note that accidentally you created a sleepsort. Values in the finishing messages are sorted! As an exercise, you can try sorting also other values.

As a more advanced example, you can create a UDF that queries REST Countries service to get the capital of a country. It uses requests library that on its own is not asynchronous. However, if you set executor=pw.udfs.async_executor() even though requests.get is not a coroutine, the function find_capital will be executed in a ThreadPoolExecutor, so it'll be possible to have more than one function started at once.

import requests


@pw.udf(executor=pw.udfs.async_executor())
def find_capital(country: str) -> str:
    result = requests.get(
        f"https://restcountries.com/v3.1/name/{country}?fields=capital",
        timeout=1,
    )
    result.raise_for_status()
    return result.json()[0]["capital"][0]


countries = pw.debug.table_from_markdown(
    """
    country
    Poland
    Germany
    Austria
    USA
    France
"""
)
countries_with_capitals = countries.with_columns(capital=find_capital(pw.this.country))
pw.debug.compute_and_print(countries_with_capitals)
[2024-04-19T19:48:57]:INFO:Preparing Pathway computation


            | country | capital
^Z3QWT29... | Austria | Vienna
^3HN31E1... | France  | Paris
^YYY4HAB... | Germany | Berlin
^X1MXHYY... | Poland  | Warsaw
^3CZ78B4... | USA     | Washington, D.C.

AsyncExecutor

It is possible to control the behavior of asynchronous UDFs using the parameters of async_executor:

  • capacity - the maximum number of concurrent operations,
  • timeout - the maximum time (in seconds) to wait for the function result,
  • retry_strategy - the strategy for handling retries in case of failures. The available strategies are ExponentialBackoffRetryStrategy and FixedDelayRetryStrategy. The exponential backoff strategy increases the waiting time between retries exponentially by multiplying the waiting time by backoff_factor. The fixed delay strategy does not increase the waiting time between retries. Both strategies add a random jitter to the waiting times.

When both timeout and retry_strategy are used, all retries have to finish within a specified timeout. You can see the application of a retry strategy in the example below. The UDF has a 10% chance of failing. It fails two times but the retry strategy executes the function with the arguments that failed again.

import random

random.seed(2)


@pw.udf(
    executor=pw.udfs.async_executor(
        retry_strategy=pw.udfs.FixedDelayRetryStrategy(max_retries=10, delay_ms=100),
    )
)
async def inc_async(x: float) -> float:
    print(f"inc_async({x})")
    if random.random() < 0.1:
        raise ValueError("err")
    await asyncio.sleep(x)
    return x + 1


table = pw.debug.table_from_markdown(
    """
    value
     0.2
     0.6
     2.0
     1.2
"""
)

result = table.select(value=inc_async(pw.this.value))
pw.debug.compute_and_print(result)
[2024-04-19T19:48:58]:INFO:Preparing Pathway computation


inc_async(0.6)
inc_async(2.0)
inc_async(0.2)
inc_async(1.2)
inc_async(2.0)
inc_async(0.2)


            | value
^X1MXHYY... | 1.2
^YYY4HAB... | 1.6
^3CZ78B4... | 2.2
^Z3QWT29... | 3.0

Of course, the retry strategy does not have to be used only to mitigate the effects of runtime errors. It can, for example, be used to query a service multiple times in the case of its temporary unavailability.

The parameters that can be used with regular UDFs can also be used with asynchronous UDFs. For instance, you can cache its results or set that it is deterministic.

@pw.udf(deterministic=True, cache_strategy=pw.udfs.InMemoryCache())
async def inc_async(x: float) -> float:
    print(f"inc_async({x})")
    await asyncio.sleep(x)
    return x + 1


table = pw.debug.table_from_markdown(
    """
    value
     0.2
     0.6
     2.0
     1.2
     0.6
     1.2
"""
)

result = table.select(value=inc_async(pw.this.value))
pw.debug.compute_and_print(result)
[2024-04-19T19:49:00]:INFO:Preparing Pathway computation


inc_async(0.6)
inc_async(2.0)
inc_async(0.2)
inc_async(1.2)


            | value
^X1MXHYY... | 1.2
^YYY4HAB... | 1.6
^3HN31E1... | 1.6
^3CZ78B4... | 2.2
^3S2X6B2... | 2.2
^Z3QWT29... | 3.0

Conclusions

In this guide, you've learned how to define Python functions (UDFs) to process data in Pathway. The functions process a single row in a single call. It is possible to define the behavior of the functions by using UDF's parameters, like deterministic, propagate_none, cache_strategy, executor, etc. A friendly reminder - if your function is deterministic, set deterministic=True as it'll help with performance.

pythonudffunctionapplytransformationcachetimeout