pw.reducers

Reducers are used in reduce to compute the aggregated results obtained by a groupby.

Typical use:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
sum
2
15

pw.reducers.any(arg)

sourceReturns any of the aggregated values. Values are consistent across application to many columns.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA | -1   |  4
2  | valA |  1   |  7
3  | valA |  2   | -3
4  | valB |  4   |  2
5  | valB |  5   |  6
6  | valB |  7   |  1
''')
result = t.groupby(t.colA).reduce(
    any_B=pw.reducers.any(t.colB),
    any_D=pw.reducers.any(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)
any_B | any_D
2     | -3
7     | 1

pw.reducers.argmax(arg)

sourceReturns the index of the maximum aggregated value.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
pw.debug.compute_and_print(t)
            | colA | colB
^X1MXHYY... | valA | -1
^YYY4HAB... | valA | 1
^Z3QWT29... | valA | 2
^3CZ78B4... | valB | 4
^3HN31E1... | valB | 4
^3S2X6B2... | valB | 7
result = t.groupby(t.colA).reduce(argmax=pw.reducers.argmax(t.colB), max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)
argmax      | max
^Z3QWT29... | 2
^3S2X6B2... | 7

pw.reducers.argmin(arg)

sourceReturns the index of the minimum aggregated value.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
pw.debug.compute_and_print(t)
            | colA | colB
^X1MXHYY... | valA | -1
^YYY4HAB... | valA | 1
^Z3QWT29... | valA | 2
^3CZ78B4... | valB | 4
^3HN31E1... | valB | 4
^3S2X6B2... | valB | 7
result = t.groupby(t.colA).reduce(argmin=pw.reducers.argmin(t.colB), min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)
argmin      | min
^X1MXHYY... | -1
^3CZ78B4... | 4

pw.reducers.avg(expression)

sourceReturns the average of the aggregated values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(avg=pw.reducers.avg(t.colB))
pw.debug.compute_and_print(result, include_id=False)
avg
0.6666666666666666
5.0

pw.reducers.count(*args)

sourceReturns the number of aggregated elements.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(count=pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
count
3
3

pw.reducers.max(arg)

sourceReturns the maximum of the aggregated values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)
max
2
7

pw.reducers.min(arg)

sourceReturns the minimum of the aggregated values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)
min
-1
4

pw.reducers.ndarray(expression, *, skip_nones=False)

sourceReturns an array containing all the aggregated values. Order of values inside an array is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA | -1   |  4
2  | valA |  1   |  7
3  | valA |  2   | -3
4  | valB |  4   |
5  | valB |  4   |  6
6  | valB |  7   |  1
''')
result = t.groupby(t.colA).reduce(
    array_B=pw.reducers.ndarray(t.colB),
    array_D=pw.reducers.ndarray(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)
array_B    | array_D
[4 4 7]    | [6 1]
[-1  1  2] | [ 4  7 -3]

pw.reducers.sorted_tuple(arg, *, skip_nones=False)

sourceReturn a sorted tuple containing all the aggregated values. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA | -1   |  4
2  | valA |  1   |  7
3  | valA |  2   | -3
4  | valB |  4   |
5  | valB |  4   |  6
6  | valB |  7   |  1
''')
result = t.groupby(t.colA).reduce(
    tuple_B=pw.reducers.sorted_tuple(t.colB),
    tuple_D=pw.reducers.sorted_tuple(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)
tuple_B    | tuple_D
(-1, 1, 2) | (-3, 4, 7)
(4, 4, 7)  | (1, 6)

pw.reducers.sum(arg)

sourceReturns the sum of the aggregated values. Can handle int, float, and array values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
sum
2
15
import pandas as pd
np_table = pw.debug.table_from_pandas(
    pd.DataFrame(
        {
            "data": [
                np.array([1, 2, 3]),
                np.array([4, 5, 6]),
                np.array([7, 8, 9]),
            ]
        }
    )
)
result = np_table.reduce(data_sum=pw.reducers.sum(np_table.data))
pw.debug.compute_and_print(result, include_id=False)
data_sum
[12 15 18]

pw.reducers.tuple(arg, *, skip_nones=False)

sourceReturn a tuple containing all the aggregated values. Order of values inside a tuple is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colC | colD
1  | valA | -1   |   5  |  4
2  | valA |  1   |   5  |  7
3  | valA |  2   |   5  | -3
4  | valB |  4   |  10  |  2
5  | valB |  4   |  10  |  6
6  | valB |  7   |  10  |  1
''')
result = t.groupby(t.colA).reduce(
    tuple_B=pw.reducers.tuple(t.colB),
    tuple_C=pw.reducers.tuple(t.colC),
    tuple_D=pw.reducers.tuple(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)
tuple_B    | tuple_C      | tuple_D
(-1, 1, 2) | (5, 5, 5)    | (4, 7, -3)
(4, 4, 7)  | (10, 10, 10) | (2, 6, 1)

pw.reducers.udf_reducer(reducer_cls)

sourceDecorator for defining custom reducers. Requires custom accumulator as an argument. Custom accumulator should implement from_row, update and compute_result. Optionally neutral and retract can be provided for more efficient processing on streams with changing data.

import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
  def __init__(self, sum, cnt):
    self.sum = sum
    self.cnt = cnt

  @classmethod
  def from_row(self, row):
    [val] = row
    return CustomAvgAccumulator(val, 1)

  def update(self, other):
    self.sum += other.sum
    self.cnt += other.cnt

  def compute_result(self) -> float:
    return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10  | Alice | dog | 100
9   | Bob   | cat | 80
8   | Alice | cat | 90
7   | Bob   | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)
owner | avg_price
Alice | 95.0
Bob   | 75.0

pw.reducers.unique(arg)

sourceReturns aggregated value, if all values are identical. If values are not identical, exception is raised.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA |  1   |  3
2  | valA |  1   |  3
3  | valA |  1   |  3
4  | valB |  2   |  4
5  | valB |  2   |  5
6  | valB |  2   |  6
''')
result = t.groupby(t.colA).reduce(unique_B=pw.reducers.unique(t.colB))
pw.debug.compute_and_print(result, include_id=False)
unique_B
1
2
result = t.groupby(t.colA).reduce(unique_D=pw.reducers.unique(t.colD))
try:
  pw.debug.compute_and_print(result, include_id=False)
except Exception as e:
  print(type(e))
<class 'pathway.engine.EngineError'>