pw.reducers
Reducers are used in reduce to compute the aggregated results obtained by a groupby.
Typical use:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.any(arg)
sourceReturns any of the aggregated values. Values are consistent across application to many columns.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | -1 | 4
2 | valA | 1 | 7
3 | valA | 2 | -3
4 | valB | 4 | 2
5 | valB | 5 | 6
6 | valB | 7 | 1
''')
result = t.groupby(t.colA).reduce(
any_B=pw.reducers.any(t.colB),
any_D=pw.reducers.any(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.argmax(arg)
sourceReturns the index of the maximum aggregated value.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
pw.debug.compute_and_print(t)
result = t.groupby(t.colA).reduce(argmax=pw.reducers.argmax(t.colB), max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.argmin(arg)
sourceReturns the index of the minimum aggregated value.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
pw.debug.compute_and_print(t)
result = t.groupby(t.colA).reduce(argmin=pw.reducers.argmin(t.colB), min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.avg(expression)
sourceReturns the average of the aggregated values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(avg=pw.reducers.avg(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.count(*args)
sourceReturns the number of aggregated elements.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(count=pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.earliest(expression)
sourceReturns the earliest of the aggregated values (the one with the lowest processing time).
Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
a | b | __time__
1 | 2 | 2
2 | 3 | 2
1 | 4 | 4
2 | 2 | 6
1 | 1 | 8
'''
)
res = t.groupby(pw.this.a).reduce(
pw.this.a,
earliest=pw.reducers.earliest(pw.this.b),
)
pw.debug.compute_and_print_update_stream(res, include_id=False)
pw.debug.compute_and_print(res, include_id=False)
pw.reducers.latest(expression)
sourceReturns the latest of the aggregated values (the one with the greatest processing time).
Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
a | b | __time__
1 | 2 | 2
2 | 3 | 2
1 | 4 | 4
2 | 2 | 6
1 | 1 | 8
'''
)
res = t.groupby(pw.this.a).reduce(
pw.this.a,
latest=pw.reducers.latest(pw.this.b),
)
pw.debug.compute_and_print_update_stream(res, include_id=False)
pw.debug.compute_and_print(res, include_id=False)
pw.reducers.max(arg)
sourceReturns the maximum of the aggregated values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.min(arg)
sourceReturns the minimum of the aggregated values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.ndarray(expression, *, skip_nones=False)
sourceReturns an array containing all the aggregated values. Order of values inside an array is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | -1 | 4
2 | valA | 1 | 7
3 | valA | 2 | -3
4 | valB | 4 |
5 | valB | 4 | 6
6 | valB | 7 | 1
''')
result = t.groupby(t.colA).reduce(
array_B=pw.reducers.ndarray(t.colB),
array_D=pw.reducers.ndarray(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.sorted_tuple(arg, *, skip_nones=False)
sourceReturn a sorted tuple containing all the aggregated values. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | -1 | 4
2 | valA | 1 | 7
3 | valA | 2 | -3
4 | valB | 4 |
5 | valB | 4 | 6
6 | valB | 7 | 1
''')
result = t.groupby(t.colA).reduce(
tuple_B=pw.reducers.sorted_tuple(t.colB),
tuple_D=pw.reducers.sorted_tuple(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.stateful_many(combine_many)
sourceDecorator used to create custom stateful reducers.
A function wrapped with it has to process the previous state and a list of updates at a specific time. It has to return a new state. The updates are grouped in batches (all updates in a batch have the same processing time, the function is called once per batch) and the batches enter the function in order of increasing processing time.
Example:
Create a table where __time__
column simulates processing time assigned
to entries when they enter pathway:
import pathway as pw
table = pw.debug.table_from_markdown(
'''
a | b | __time__
3 | 1 | 2
4 | 1 | 2
13 | 2 | 2
16 | 2 | 4
2 | 2 | 6
4 | 1 | 6
'''
)
Now create a custom stateful reducer. It is going to compute a weird sum. It is a sum of even entries incremented by 1 and unchanged odd entries.
@pw.reducers.stateful_many
def weird_sum(state: int | None, rows: list[tuple[list[int], int]]) -> int:
if state is None:
state = 0
for row, cnt in rows:
value = row[0]
if value % 2 == 0:
state += value + 1
else:
state += value
return state
state
is None
when the function is called for the first time for a given group.
To compute a weird sum, you should set it to 0 then.
row
is a list of values passed to the reducer. When the reducer is called as
weird_sum(pw.this.a)
, the list has only one element, i.e. value from the column a.
cnt
tells whether the row is an insertion (cnt == 1
) or deletion (cnt == -1
).
You can learn more here.
You can now use the reducer in reduce
operator and compute the result:
result = table.groupby(pw.this.b).reduce(pw.this.b, s=weird_sum(pw.this.a))
pw.debug.compute_and_print(result, include_id=False)
weird_sum
is called 2 times for group 1 (at processing times 2 and 6) and 3 times
for group 2 (at processing times 2, 4, 6).
pw.reducers.stateful_single(combine_single)
sourceDecorator used to create custom stateful reducers.
A function wrapped with it has to process the previous state and a single update. It has to return a new state. The function is called with entries in order of increasing processing time. If there are multiple entries with the same processing time, their order is unspecified.
The function can only be used on tables with insertions only (no updates or deletions). If you need to handle updates/deletions, see stateful_many.
Example:
Create a table where __time__
column simulates processing time assigned
to entries when they enter pathway:
import pathway as pw
table = pw.debug.table_from_markdown(
'''
a | b | __time__
3 | 1 | 2
4 | 1 | 2
13 | 2 | 2
16 | 2 | 4
2 | 2 | 6
4 | 1 | 6
'''
)
Create a custom stateful reducer. It is going to compute a weird sum. It is a sum of even entries incremented by 1 and unchanged odd entries.
@pw.reducers.stateful_single
def weird_sum(state: int | None, value) -> int:
if state is None:
state = 0
if value % 2 == 0:
state += value + 1
else:
state += value
return state
state
is None
when the function is called for the first time for a given group.
To compute a weird sum, you should set it to 0 then.
You can now use the reducer in reduce
operator and compute the result:
result = table.groupby(pw.this.b).reduce(pw.this.b, s=weird_sum(pw.this.a))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.sum(arg)
sourceReturns the sum of the aggregated values. Can handle int, float, and array values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
import pandas as pd
np_table = pw.debug.table_from_pandas(
pd.DataFrame(
{
"data": [
np.array([1, 2, 3]),
np.array([4, 5, 6]),
np.array([7, 8, 9]),
]
}
)
)
result = np_table.reduce(data_sum=pw.reducers.sum(np_table.data))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.tuple(arg, *, skip_nones=False)
sourceReturn a tuple containing all the aggregated values. Order of values inside a tuple is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colC | colD
1 | valA | -1 | 5 | 4
2 | valA | 1 | 5 | 7
3 | valA | 2 | 5 | -3
4 | valB | 4 | 10 | 2
5 | valB | 4 | 10 | 6
6 | valB | 7 | 10 | 1
''')
result = t.groupby(t.colA).reduce(
tuple_B=pw.reducers.tuple(t.colB),
tuple_C=pw.reducers.tuple(t.colC),
tuple_D=pw.reducers.tuple(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.udf_reducer(reducer_cls)
sourceDecorator for defining stateful reducers. Requires custom accumulator as an argument.
Custom accumulator should implement from_row
, update
and compute_result
.
Optionally neutral
and retract
can be provided for more efficient processing on
streams with changing data.
import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
def __init__(self, sum, cnt):
self.sum = sum
self.cnt = cnt
@classmethod
def from_row(self, row):
[val] = row
return CustomAvgAccumulator(val, 1)
def update(self, other):
self.sum += other.sum
self.cnt += other.cnt
def compute_result(self) -> float:
return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10 | Alice | dog | 100
9 | Bob | cat | 80
8 | Alice | cat | 90
7 | Bob | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)
pw.reducers.unique(arg)
sourceReturns aggregated value, if all values are identical. If values are not identical, exception is raised.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | 1 | 3
2 | valA | 1 | 3
3 | valA | 1 | 3
4 | valB | 2 | 4
5 | valB | 2 | 5
6 | valB | 2 | 6
''')
result = t.groupby(t.colA).reduce(unique_B=pw.reducers.unique(t.colB))
pw.debug.compute_and_print(result, include_id=False)
result = t.groupby(t.colA).reduce(unique_D=pw.reducers.unique(t.colD))
try:
pw.debug.compute_and_print(result, include_id=False)
except Exception as e:
print(type(e))