pw.reducers
Reducers are used in reduce to compute the aggregated results obtained by a groupby.
Typical use:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.any(arg)
sourceReturns any of the aggregated values. Values are consistent across application to many columns.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | -1 | 4
2 | valA | 1 | 7
3 | valA | 2 | -3
4 | valB | 4 | 2
5 | valB | 5 | 6
6 | valB | 7 | 1
''')
result = t.groupby(t.colA).reduce(
any_B=pw.reducers.any(t.colB),
any_D=pw.reducers.any(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.argmax(arg)
sourceReturns the index of the maximum aggregated value.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
pw.debug.compute_and_print(t)
result = t.groupby(t.colA).reduce(argmax=pw.reducers.argmax(t.colB), max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.argmin(arg)
sourceReturns the index of the minimum aggregated value.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
pw.debug.compute_and_print(t)
result = t.groupby(t.colA).reduce(argmin=pw.reducers.argmin(t.colB), min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.avg(expression)
sourceReturns the average of the aggregated values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(avg=pw.reducers.avg(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.count(*args)
sourceReturns the number of aggregated elements.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(count=pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.earliest(expression)
sourceReturns the earliest of the aggregated values (the one with the lowest processing time).
Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
a | b | __time__
1 | 2 | 2
2 | 3 | 2
1 | 4 | 4
2 | 2 | 6
1 | 1 | 8
'''
)
res = t.groupby(pw.this.a).reduce(
pw.this.a,
earliest=pw.reducers.earliest(pw.this.b),
)
pw.debug.compute_and_print_update_stream(res, include_id=False)
pw.debug.compute_and_print(res, include_id=False)
pw.reducers.latest(expression)
sourceReturns the latest of the aggregated values (the one with the greatest processing time).
Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
a | b | __time__
1 | 2 | 2
2 | 3 | 2
1 | 4 | 4
2 | 2 | 6
1 | 1 | 8
'''
)
res = t.groupby(pw.this.a).reduce(
pw.this.a,
latest=pw.reducers.latest(pw.this.b),
)
pw.debug.compute_and_print_update_stream(res, include_id=False)
pw.debug.compute_and_print(res, include_id=False)
pw.reducers.max(arg)
sourceReturns the maximum of the aggregated values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.min(arg)
sourceReturns the minimum of the aggregated values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.ndarray(expression, *, skip_nones=False)
sourceReturns an array containing all the aggregated values. Order of values inside an array is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | -1 | 4
2 | valA | 1 | 7
3 | valA | 2 | -3
4 | valB | 4 |
5 | valB | 4 | 6
6 | valB | 7 | 1
''')
result = t.groupby(t.colA).reduce(
array_B=pw.reducers.ndarray(t.colB),
array_D=pw.reducers.ndarray(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.sorted_tuple(arg, *, skip_nones=False)
sourceReturn a sorted tuple containing all the aggregated values. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | -1 | 4
2 | valA | 1 | 7
3 | valA | 2 | -3
4 | valB | 4 |
5 | valB | 4 | 6
6 | valB | 7 | 1
''')
result = t.groupby(t.colA).reduce(
tuple_B=pw.reducers.sorted_tuple(t.colB),
tuple_D=pw.reducers.sorted_tuple(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.sum(arg)
sourceReturns the sum of the aggregated values. Can handle int, float, and array values.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA | 1
valA | 2
valB | 4
valB | 4
valB | 7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
import pandas as pd
np_table = pw.debug.table_from_pandas(
pd.DataFrame(
{
"data": [
np.array([1, 2, 3]),
np.array([4, 5, 6]),
np.array([7, 8, 9]),
]
}
)
)
result = np_table.reduce(data_sum=pw.reducers.sum(np_table.data))
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.tuple(arg, *, skip_nones=False)
sourceReturn a tuple containing all the aggregated values. Order of values inside a tuple is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colC | colD
1 | valA | -1 | 5 | 4
2 | valA | 1 | 5 | 7
3 | valA | 2 | 5 | -3
4 | valB | 4 | 10 | 2
5 | valB | 4 | 10 | 6
6 | valB | 7 | 10 | 1
''')
result = t.groupby(t.colA).reduce(
tuple_B=pw.reducers.tuple(t.colB),
tuple_C=pw.reducers.tuple(t.colC),
tuple_D=pw.reducers.tuple(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)
pw.reducers.udf_reducer(reducer_cls)
sourceDecorator for defining custom reducers. Requires custom accumulator as an argument.
Custom accumulator should implement from_row
, update
and compute_result
.
Optionally neutral
and retract
can be provided for more efficient processing on
streams with changing data.
import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
def __init__(self, sum, cnt):
self.sum = sum
self.cnt = cnt
@classmethod
def from_row(self, row):
[val] = row
return CustomAvgAccumulator(val, 1)
def update(self, other):
self.sum += other.sum
self.cnt += other.cnt
def compute_result(self) -> float:
return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10 | Alice | dog | 100
9 | Bob | cat | 80
8 | Alice | cat | 90
7 | Bob | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)
pw.reducers.unique(arg)
sourceReturns aggregated value, if all values are identical. If values are not identical, exception is raised.
Example:
import pathway as pw
t = pw.debug.table_from_markdown('''
| colA | colB | colD
1 | valA | 1 | 3
2 | valA | 1 | 3
3 | valA | 1 | 3
4 | valB | 2 | 4
5 | valB | 2 | 5
6 | valB | 2 | 6
''')
result = t.groupby(t.colA).reduce(unique_B=pw.reducers.unique(t.colB))
pw.debug.compute_and_print(result, include_id=False)
result = t.groupby(t.colA).reduce(unique_D=pw.reducers.unique(t.colD))
try:
pw.debug.compute_and_print(result, include_id=False)
except Exception as e:
print(type(e))