pw.reducers

Reducers are used in reduce to compute the aggregated results obtained by a groupby.

Typical use:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.any(arg)

sourceReturns any of the aggregated values. Values are consistent across application to many columns.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA | -1   |  4
2  | valA |  1   |  7
3  | valA |  2   | -3
4  | valB |  4   |  2
5  | valB |  5   |  6
6  | valB |  7   |  1
''')
result = t.groupby(t.colA).reduce(
    any_B=pw.reducers.any(t.colB),
    any_D=pw.reducers.any(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.argmax(arg)

sourceReturns the index of the maximum aggregated value.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
pw.debug.compute_and_print(t)
result = t.groupby(t.colA).reduce(argmax=pw.reducers.argmax(t.colB), max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.argmin(arg)

sourceReturns the index of the minimum aggregated value.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
pw.debug.compute_and_print(t)
result = t.groupby(t.colA).reduce(argmin=pw.reducers.argmin(t.colB), min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.avg(expression)

sourceReturns the average of the aggregated values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(avg=pw.reducers.avg(t.colB))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.count(*args)

sourceReturns the number of aggregated elements.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(count=pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.earliest(expression)

sourceReturns the earliest of the aggregated values (the one with the lowest processing time).

Example:

import pathway as pw
t = pw.debug.table_from_markdown(
    '''
    a | b | __time__
    1 | 2 |     2
    2 | 3 |     2
    1 | 4 |     4
    2 | 2 |     6
    1 | 1 |     8
'''
)
res = t.groupby(pw.this.a).reduce(
    pw.this.a,
    earliest=pw.reducers.earliest(pw.this.b),
)
pw.debug.compute_and_print_update_stream(res, include_id=False)
pw.debug.compute_and_print(res, include_id=False)

pw.reducers.latest(expression)

sourceReturns the latest of the aggregated values (the one with the greatest processing time).

Example:

import pathway as pw
t = pw.debug.table_from_markdown(
    '''
    a | b | __time__
    1 | 2 |     2
    2 | 3 |     2
    1 | 4 |     4
    2 | 2 |     6
    1 | 1 |     8
'''
)
res = t.groupby(pw.this.a).reduce(
    pw.this.a,
    latest=pw.reducers.latest(pw.this.b),
)
pw.debug.compute_and_print_update_stream(res, include_id=False)
pw.debug.compute_and_print(res, include_id=False)

pw.reducers.max(arg)

sourceReturns the maximum of the aggregated values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(max=pw.reducers.max(t.colB))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.min(arg)

sourceReturns the minimum of the aggregated values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(min=pw.reducers.min(t.colB))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.ndarray(expression, *, skip_nones=False)

sourceReturns an array containing all the aggregated values. Order of values inside an array is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA | -1   |  4
2  | valA |  1   |  7
3  | valA |  2   | -3
4  | valB |  4   |
5  | valB |  4   |  6
6  | valB |  7   |  1
''')
result = t.groupby(t.colA).reduce(
    array_B=pw.reducers.ndarray(t.colB),
    array_D=pw.reducers.ndarray(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.sorted_tuple(arg, *, skip_nones=False)

sourceReturn a sorted tuple containing all the aggregated values. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA | -1   |  4
2  | valA |  1   |  7
3  | valA |  2   | -3
4  | valB |  4   |
5  | valB |  4   |  6
6  | valB |  7   |  1
''')
result = t.groupby(t.colA).reduce(
    tuple_B=pw.reducers.sorted_tuple(t.colB),
    tuple_D=pw.reducers.sorted_tuple(t.colD, skip_nones=True),
)
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.sum(arg)

sourceReturns the sum of the aggregated values. Can handle int, float, and array values.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
colA | colB
valA | -1
valA |  1
valA |  2
valB |  4
valB |  4
valB |  7
''')
result = t.groupby(t.colA).reduce(sum=pw.reducers.sum(t.colB))
pw.debug.compute_and_print(result, include_id=False)
import pandas as pd
np_table = pw.debug.table_from_pandas(
    pd.DataFrame(
        {
            "data": [
                np.array([1, 2, 3]),
                np.array([4, 5, 6]),
                np.array([7, 8, 9]),
            ]
        }
    )
)
result = np_table.reduce(data_sum=pw.reducers.sum(np_table.data))
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.tuple(arg, *, skip_nones=False)

sourceReturn a tuple containing all the aggregated values. Order of values inside a tuple is consistent across application to many columns. If optional argument skip_nones is set to True, any Nones in aggregated values are omitted from the result.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colC | colD
1  | valA | -1   |   5  |  4
2  | valA |  1   |   5  |  7
3  | valA |  2   |   5  | -3
4  | valB |  4   |  10  |  2
5  | valB |  4   |  10  |  6
6  | valB |  7   |  10  |  1
''')
result = t.groupby(t.colA).reduce(
    tuple_B=pw.reducers.tuple(t.colB),
    tuple_C=pw.reducers.tuple(t.colC),
    tuple_D=pw.reducers.tuple(t.colD),
)
pw.debug.compute_and_print(result, include_id=False)

pw.reducers.udf_reducer(reducer_cls)

sourceDecorator for defining custom reducers. Requires custom accumulator as an argument. Custom accumulator should implement from_row, update and compute_result. Optionally neutral and retract can be provided for more efficient processing on streams with changing data.

import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
  def __init__(self, sum, cnt):
    self.sum = sum
    self.cnt = cnt

  @classmethod
  def from_row(self, row):
    [val] = row
    return CustomAvgAccumulator(val, 1)

  def update(self, other):
    self.sum += other.sum
    self.cnt += other.cnt

  def compute_result(self) -> float:
    return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10  | Alice | dog | 100
9   | Bob   | cat | 80
8   | Alice | cat | 90
7   | Bob   | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)

pw.reducers.unique(arg)

sourceReturns aggregated value, if all values are identical. If values are not identical, exception is raised.

Example:

import pathway as pw
t = pw.debug.table_from_markdown('''
   | colA | colB | colD
1  | valA |  1   |  3
2  | valA |  1   |  3
3  | valA |  1   |  3
4  | valB |  2   |  4
5  | valB |  2   |  5
6  | valB |  2   |  6
''')
result = t.groupby(t.colA).reduce(unique_B=pw.reducers.unique(t.colB))
pw.debug.compute_and_print(result, include_id=False)
result = t.groupby(t.colA).reduce(unique_D=pw.reducers.unique(t.colD))
try:
  pw.debug.compute_and_print(result, include_id=False)
except Exception as e:
  print(type(e))