AsyncTransformer
One way of transforming data in Pathway, when simple transformations are not enough, is using UDFs.
However, if the flexibility of the UDFs is still not enough, you can use even more general and flexible AsyncTransformer
, useful especially for asynchronous computations.
AsyncTransformer
is a different mechanism than UDFs.
It acts on the whole Pathway Table and returns a new Table.
In contrast to UDFs, it is fully asynchronous.
It starts the invoke
method for every row that arrives,
without waiting for the previous batches to finish.
When the call is finished, its result is returned to the engine with a new processing time.
To write an AsyncTransformer
you need to inherit from pw.AsyncTransformer
and implement the invoke
method (it is a coroutine). The names of the arguments of the method have to be the same as the columns in the input table. You can use additional arguments but then you have to specify their default value (it might be useful if you want to use the same AsyncTransformer
on multiple Pathway tables with different sets of columns). You have to use all columns from the input table. The order of columns/arguments doesn't matter as they are passed to the method as keyword arguments.
You also need to define the schema of a table that is produced. The invoke
method has to return a dictionary containing values to put in all columns of the output table. The keys in the dictionary has to match fields from the output schema.
Let's create a simple AsyncTransformer
that produces a Table with two output columns - value
and ret
.
import pathway as pw
import asyncio
class OutputSchema(pw.Schema):
value: int
ret: int
class SimpleAsyncTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
async def invoke(self, value: int) -> dict:
await asyncio.sleep(value / 10)
return dict(value=value, ret=value + 1)
Let's use the transformer on the example input table.
The result table containing only successful calls can be retrieved from the successful
property of the transformer.
table = pw.debug.table_from_markdown(
"""
value
12
6
2
2
6
"""
)
result = SimpleAsyncTransformer(input_table=table).successful
pw.debug.compute_and_print(result)
| value | ret
^Z3QWT29... | 2 | 3
^3CZ78B4... | 2 | 3
^YYY4HAB... | 6 | 7
^3HN31E1... | 6 | 7
^X1MXHYY... | 12 | 13
The result is correct. Now let's take a look at the output times:
pw.debug.compute_and_print_update_stream(result)
| value | ret | __time__ | __diff__
^Z3QWT29... | 2 | 3 | 1731735736386 | 1
^3CZ78B4... | 2 | 3 | 1731735736588 | 1
^YYY4HAB... | 6 | 7 | 1731735736588 | 1
^3HN31E1... | 6 | 7 | 1731735736988 | 1
^X1MXHYY... | 12 | 13 | 1731735736988 | 1
Even though all values have equal processing times initially,
the output times are different between the rows.
It is the effect of AsyncTransformer
not waiting for other rows to finish.
Thanks to that some rows can be processed downstream quicker.
If you want some rows to wait for some other rows to finish,
take a look at the instance
parameter.
Failing calls
The invoke
method is usually written by an external user (like you) and it can contain bugs (unless you write bug-free code).
When the invoke
call raises an exception or times out (see the next section for that), its output won't be included in the successful
table.
The failed rows are put in the table accessible by the failed
property.
Let's define a new AsyncTransformer
to check that.
Maybe we don't like the value and we fail our function whenever we get it as an argument.
class SometimesFailingAsyncTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
async def invoke(self, value: int) -> dict:
if value == 12:
raise ValueError("incorrect value")
return dict(value=value, ret=value + 1)
t = SometimesFailingAsyncTransformer(input_table=table)
pw.debug.compute_and_print(t.successful)
pw.debug.compute_and_print(t.failed)
| value | ret
^Z3QWT29... | 2 | 3
^3CZ78B4... | 2 | 3
^YYY4HAB... | 6 | 7
^3HN31E1... | 6 | 7
| value | ret
^X1MXHYY... | |
In the failed table you only get the ids of failed rows (other columns contain None
).
Because the invoke
call failed it was impossible to return any values.
You can check which values have failed by joining with the input table:
failed = t.failed.join(table, pw.left.id == pw.right.id, id=pw.left.id).select(
pw.right.value
)
pw.debug.compute_and_print(failed)
| value
^X1MXHYY... | 12
Now, you can see that the failed row actually has in the value
column.
Controlling AsyncTransformer behavior
It is possible to control the behavior of AsyncTransformer
using parameters similar to those in UDFs.
They can be passed to with_options
method.
The available options are:
capacity
- the maximum number of concurrent operations,timeout
- the maximum time (in seconds) to wait for the function result,retry_strategy
- the strategy for handling retries in case of failures. The same strategies as for asynchronous UDFs can be used. Examples:ExponentialBackoffRetryStrategy
,FixedDelayRetryStrategy
,cache_strategy
- the caching strategy. The same strategies as for UDFs can be used. Examples:DiskCache
,InMemoryCache
.
In the following example, you add a timeout to the SimpleAsyncTransformer
defined above.
It is set to seconds.
t = SimpleAsyncTransformer(input_table=table).with_options(timeout=0.9)
pw.debug.compute_and_print(t.successful)
failed = t.failed.join(table, pw.left.id == pw.right.id).select(pw.right.value)
pw.debug.compute_and_print(failed)
| value | ret
^Z3QWT29... | 2 | 3
^3CZ78B4... | 2 | 3
^YYY4HAB... | 6 | 7
^3HN31E1... | 6 | 7
| value
^MFBX8ZW... | 12
Recall that the transformer sleeps the invoke
method for a time passed as the method argument divided by .
That's why calls with value
less than were successful, and calls with value
greater than failed.
AsyncTransformer consistency
By default, AsyncTransformer
preserves order for a given key.
It means that if some row is still executed by AsyncTransformer
and its update starts being executed and finishes earlier than the original row,
it'll wait for the completion of the original row processing before being returned to the engine.
The update cannot have an earlier time assigned than the original row as it would break the correctness of the computations.
Let's analyze this case by computing the sums of entries from the stream.
You want to compute the sum for each group
independently.
table = pw.debug.table_from_markdown(
"""
group | value | __time__
1 | 2 | 2
1 | 3 | 2
2 | 1 | 2
1 | -3 | 4
2 | 2 | 4
"""
)
sums = table.groupby(pw.this.group).reduce(
pw.this.group, value=pw.reducers.sum(pw.this.value)
)
pw.debug.compute_and_print_update_stream(sums)
| group | value | __time__ | __diff__
^YYY4HAB... | 1 | 5 | 2 | 1
^Z3QWT29... | 2 | 1 | 2 | 1
^YYY4HAB... | 1 | 5 | 4 | -1
^Z3QWT29... | 2 | 1 | 4 | -1
^YYY4HAB... | 1 | 2 | 4 | 1
^Z3QWT29... | 2 | 3 | 4 | 1
The sums computed in time are and .
They are deleted in time and replaced with sums and .
Let's modify SimpleAsyncTransformer
to propagate the group
column as well and apply it to the sums
table.
class OutputWithGroupSchema(pw.Schema):
group: int
value: int
ret: int
class GroupAsyncTransformer(pw.AsyncTransformer, output_schema=OutputWithGroupSchema):
async def invoke(self, value: int, group: int) -> dict:
await asyncio.sleep(value / 10)
return dict(group=group, value=value, ret=value + 1)
result = GroupAsyncTransformer(input_table=sums).successful
pw.debug.compute_and_print_update_stream(result)
| group | value | ret | __time__ | __diff__
^Z3QWT29... | 2 | 1 | 2 | 1731735740298 | 1
^Z3QWT29... | 2 | 1 | 2 | 1731735740398 | -1
^Z3QWT29... | 2 | 3 | 4 | 1731735740398 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735740598 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735740800 | -1
^YYY4HAB... | 1 | 2 | 3 | 1731735740800 | 1
All rows reach GroupAsyncTransformer
at approximately the same time.
In group , the value at time is , and at time is .
The first value is processed faster and returned to the engine.
When a call for the next value finishes, the old value is removed and a new value is returned to the engine.
The situation for group is different.
The value at time is greater than the value at time ().
Because of that, the second call to invoke
finishes earlier and has to wait for the first call to finish.
When the first call finishes, they are both returned to the engine.
The value from the second call is newer and immediately replaces the old value.
Partial consistency
Sometimes, the consistency for rows with a single key might not be enough for you.
If you want to guarantee an order within a group of records, you can use the instance
parameter of the AsyncTransformer
.
Rows within a single instance
are ordered.
It means that the results for rows with higher initial processing times can't overtake the results for rows with lower initial processing times.
All results within a single instance with equal processing times wait for all rows with this time to finish.
Using the instance
parameter does not block new calls from starting. Only the results of the calls get synchronized.
To demonstrate the synchronization, we create a new table with more data:
table = pw.debug.table_from_markdown(
"""
group | value | __time__
1 | 2 | 2
1 | 3 | 2
2 | 1 | 2
3 | 1 | 2
4 | 3 | 2
1 | -3 | 4
2 | 3 | 4
3 | 1 | 4
4 | -1 | 4
"""
)
sums = table.groupby(pw.this.group).reduce(
pw.this.group, value=pw.reducers.sum(pw.this.value)
)
pw.debug.compute_and_print_update_stream(sums)
| group | value | __time__ | __diff__
^YYY4HAB... | 1 | 5 | 2 | 1
^Z3QWT29... | 2 | 1 | 2 | 1
^3CZ78B4... | 3 | 1 | 2 | 1
^3HN31E1... | 4 | 3 | 2 | 1
^YYY4HAB... | 1 | 5 | 4 | -1
^Z3QWT29... | 2 | 1 | 4 | -1
^3CZ78B4... | 3 | 1 | 4 | -1
^3HN31E1... | 4 | 3 | 4 | -1
^YYY4HAB... | 1 | 2 | 4 | 1
^Z3QWT29... | 2 | 4 | 4 | 1
^3CZ78B4... | 3 | 2 | 4 | 1
^3HN31E1... | 4 | 2 | 4 | 1
Now, you have four groups, with one row for each group.
You want to guarantee consistency separately for even and odd groups.
To do that, you need to set the instance
of GroupAsyncTransformer
appropriately.
result = GroupAsyncTransformer(input_table=sums, instance=pw.this.group % 2).successful
pw.debug.compute_and_print_update_stream(result)
| group | value | ret | __time__ | __diff__
^Z3QWT29... | 2 | 1 | 2 | 1731735741030 | 1
^3HN31E1... | 4 | 3 | 4 | 1731735741030 | 1
^Z3QWT29... | 2 | 1 | 2 | 1731735741332 | -1
^3HN31E1... | 4 | 3 | 4 | 1731735741332 | -1
^Z3QWT29... | 2 | 4 | 5 | 1731735741332 | 1
^3HN31E1... | 4 | 2 | 3 | 1731735741332 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735741432 | 1
^3CZ78B4... | 3 | 1 | 2 | 1731735741432 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735741532 | -1
^3CZ78B4... | 3 | 1 | 2 | 1731735741532 | -1
^YYY4HAB... | 1 | 2 | 3 | 1731735741532 | 1
^3CZ78B4... | 3 | 2 | 3 | 1731735741532 | 1
The updates for groups are bundled together. Group at time could finish earlier, but it waits for group . Groups are also dependent on each other. Group could finish quicker, but it waits for group to finish.
You can have a look at how the updates would proceed if no instance
was specified:
result = GroupAsyncTransformer(input_table=sums).successful
pw.debug.compute_and_print_update_stream(result)
| group | value | ret | __time__ | __diff__
^Z3QWT29... | 2 | 1 | 2 | 1731735741658 | 1
^3CZ78B4... | 3 | 2 | 3 | 1731735741758 | 1
^3HN31E1... | 4 | 3 | 4 | 1731735741858 | 1
^Z3QWT29... | 2 | 1 | 2 | 1731735741958 | -1
^3HN31E1... | 4 | 3 | 4 | 1731735741958 | -1
^Z3QWT29... | 2 | 4 | 5 | 1731735741958 | 1
^3HN31E1... | 4 | 2 | 3 | 1731735741958 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735742058 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735742158 | -1
^YYY4HAB... | 1 | 2 | 3 | 1731735742158 | 1
As you can see, only ordering within a group is preserved.
Full consistency
By using the instance
parameter, it is possible to make the output preserve the temporal ordering of the input.
It is enough to set instance
to the same value for all rows, for example by using a constant.
Then results for rows with a given time will wait for all previous times to finish before being returned to the engine.
Rows with a given time are returned all at once and have the same time assigned.
The new calls are not blocked from starting. Only the results get synchronized.
Let's use constant instance
in the example from the previous section.
result = GroupAsyncTransformer(input_table=sums, instance=0).successful
pw.debug.compute_and_print_update_stream(result)
| group | value | ret | __time__ | __diff__
^YYY4HAB... | 1 | 5 | 6 | 1731735742284 | 1
^Z3QWT29... | 2 | 1 | 2 | 1731735742284 | 1
^3CZ78B4... | 3 | 1 | 2 | 1731735742284 | 1
^3HN31E1... | 4 | 3 | 4 | 1731735742284 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735742784 | -1
^Z3QWT29... | 2 | 1 | 2 | 1731735742784 | -1
^3CZ78B4... | 3 | 1 | 2 | 1731735742784 | -1
^3HN31E1... | 4 | 3 | 4 | 1731735742784 | -1
^YYY4HAB... | 1 | 2 | 3 | 1731735742784 | 1
^Z3QWT29... | 2 | 4 | 5 | 1731735742784 | 1
^3CZ78B4... | 3 | 2 | 3 | 1731735742784 | 1
^3HN31E1... | 4 | 2 | 3 | 1731735742784 | 1
All rows are returned at the same time. There are also no updates because calls for time are finished later than calls for time . You can play with the data to make time finish before time and see that the update happens once.
Failing calls consistency
If the instance
parameter is used and the call for a given instance fails, the instance is in the failed state from this time.
AsyncTransformer
requires all calls with a given (instance, processing time)
pair to finish successfully.
If at least one call fails, returning other rows could leave the instance in an inconsistent state.
Let's take a look at what happens if group
fails at time .
class SuspiciousGroupAsyncTransformer(
pw.AsyncTransformer, output_schema=OutputWithGroupSchema
):
async def invoke(self, value: int, group: int) -> dict:
if group == 4 and value == 2:
raise ValueError("err")
await asyncio.sleep(value / 10)
return dict(group=group, value=value, ret=value + 1)
result = SuspiciousGroupAsyncTransformer(
input_table=sums, instance=pw.this.group % 2
).successful
pw.debug.compute_and_print_update_stream(result)
| group | value | ret | __time__ | __diff__
^Z3QWT29... | 2 | 1 | 2 | 1731735742898 | 1
^3HN31E1... | 4 | 3 | 4 | 1731735742898 | 1
^Z3QWT29... | 2 | 1 | 2 | 1731735743200 | -1
^3HN31E1... | 4 | 3 | 4 | 1731735743200 | -1
^YYY4HAB... | 1 | 5 | 6 | 1731735743300 | 1
^3CZ78B4... | 3 | 1 | 2 | 1731735743300 | 1
^YYY4HAB... | 1 | 5 | 6 | 1731735743400 | -1
^3CZ78B4... | 3 | 1 | 2 | 1731735743400 | -1
^YYY4HAB... | 1 | 2 | 3 | 1731735743400 | 1
^3CZ78B4... | 3 | 2 | 3 | 1731735743400 | 1
New values for the even instance (groups ) coming from the entries at time are not inserted because group fails and hence the whole instance fails. None of the entries in the odd instance (groups ) fail so it is updated normally.
Conclusions
In this guide, you've learned how to create your own AsyncTransformer
when you need to process the data asynchronously in Pathway.
You know how to control its behavior by setting parameters like timeout
, cache_strategy
and retry_strategy
.
You can control the tradeoff between the speed and the consistency of the results.
Now, you also understand the difference between asynchronous UDFs and AsyncTransformer.
The former is asynchronous only within a single batch of data
and can return values only to a single column,
while the latter is fully asynchronous and can return multiple columns.
It also allows for specifying the consistency level by using the instance
parameter.