Writing Simple Stateful Reducer in Pathway
Pathway supports natively aggregation using a wide range of reducers, e.g., sum
, count
, or max
. However, those might not cover all the necessary ways of aggregating values. In this tutorial, you learn how to write reducers implementing custom logic.
For example, let's implement a custom stateful stdev
reducer that computes the standard deviation.
import pathway as pw
SHOW_DEBUG = False
class StdDevAccumulator(pw.BaseCustomAccumulator):
def __init__(self, cnt, sum, sum_sq):
self.cnt = cnt
self.sum = sum
self.sum_sq = sum_sq
@classmethod
def from_row(cls, row):
[val] = row
if SHOW_DEBUG:
print("from_row()")
return cls(1, val, val**2)
def update(self, other):
self.cnt += other.cnt
self.sum += other.sum
self.sum_sq += other.sum_sq
if SHOW_DEBUG:
print("update()")
def compute_result(self) -> float:
mean = self.sum / self.cnt
mean_sq = self.sum_sq / self.cnt
if SHOW_DEBUG:
print("compute_result()")
return mean_sq - mean**2
stddev = pw.reducers.udf_reducer(StdDevAccumulator)
Above, the pw.BaseCustomAccumulator
class is used as a base for the StdDevAccumulator
, which describes the logic of the underlying accumulator. The accumulator class requires a few methods:
from_row
, which constructs an accumulator from the values of a single row of a table (here, a single value since our reducer applies to a single column),update
, which updates one accumulator by another accumulator,compute_result
, which produces the output based on the accumulator state,retract
, is an optional method, which processes negative updates,neutral
, is an optional method, which returns state corresponding to consuming 0 rows.
Now, let's see the reducer in action.
temperature_data = pw.debug.table_from_markdown(
"""
date | temperature
2023-06-06 | 28.0
2023-06-07 | 23.1
2023-06-08 | 24.5
2023-06-09 | 26.0
2023-06-10 | 28.3
2023-06-11 | 25.7
"""
)
temperature_statistics = temperature_data.reduce(
avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)
)
pw.debug.compute_and_print(temperature_statistics)
| avg | stddev
^PWSRT42... | 25.933333333333334 | 3.3355555555555156
However, with this logic, our reducer is not smartly processing negative updates: it starts the computation from scratch whenever a negative update is encountered. You can see this in action by enabling debug information and processing table where row removal happens. Let's insert several values at time 0 and then remove one already inserted value and add another at time 2.
SHOW_DEBUG = True
temperature_data_with_updates = pw.debug.table_from_markdown(
"""
date | temperature | __time__ | __diff__
2023-06-06 | 28.0 | 0 | 1
2023-06-07 | 23.1 | 0 | 1
2023-06-08 | 24.5 | 0 | 1
2023-06-09 | 26.0 | 0 | 1
2023-06-10 | 28.3 | 0 | 1
2023-06-11 | 25.7 | 0 | 1
2023-06-11 | 25.7 | 2 | -1
2023-06-11 | 25.9 | 2 | 1
"""
)
temperature_statistics_with_updates = temperature_data_with_updates.reduce(
avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)
)
pw.debug.compute_and_print(temperature_statistics_with_updates)
from_row()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
compute_result()
compute_result()
| avg | stddev
^PWSRT42... | 25.96666666666667 | 3.3255555555555247
It can be alleviated by extending our reducer and providing a method for processing negative updates.
class ImprovedStdDevAccumulator(StdDevAccumulator):
def retract(self, other):
self.cnt -= other.cnt
self.sum -= other.sum
self.sum_sq -= other.sum_sq
if SHOW_DEBUG:
print("retract()")
improved_stddev = pw.reducers.udf_reducer(ImprovedStdDevAccumulator)
And now you can test the improved reducer in action.
temperature_statistics_improved = temperature_data_with_updates.reduce(
avg=pw.reducers.avg(pw.this.temperature),
stddev=improved_stddev(pw.this.temperature),
)
pw.debug.compute_and_print(temperature_statistics_improved)
from_row()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
update()
from_row()
retract()
compute_result()
compute_result()
| avg | stddev
^PWSRT42... | 25.96666666666667 | 3.3255555555555247
In the example above, 10x calls to update()
and 12x calls to from_row()
are replaced with 6x calls to update()
, 1x call to retract()
and 8x calls to from_row()
.
This comes from the fact that former reducer:
- had to call
from_row()
for each row of the table, wrapping each single value into separateStdDevAccumulator
object, - had to call
update()
for each row of the table except the first consumed, - had to restart from scratch after the update to the table, thus it had to pay the cost twice.
While the latter reducer aggregated the table at time 0 in the same way as former one, but processed the update differently:
- had to wrap both delete and insert updates with
from_row()
calls - called once
retract()
and onceupdate()
.