pathway.stdlib.temporal package

class pw.temporal.AsofJoinResult(side_data, mode, defaults, direction)

Result of an ASOF join of two tables

Example:

import pathway as pw

t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)

t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

class pw.temporal.Direction(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

class pw.temporal.IntervalJoinResult(left_bucketed, right_bucketed, earlier_part_filtered, later_part_filtered, table_substitution, mode)

Result of an interval join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
join_result = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1))
isinstance(join_result, pw.temporal.IntervalJoinResult)
True
pw.debug.compute_and_print(
    join_result.select(left_t=t1.t, right_t=t2.t), include_id=False
)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4

select(*args, **kwargs)

Computes a result of an interval join.
  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

class pw.temporal.Window()

class pw.temporal.WindowJoinResult(join_result, left_original, right_original, left_new, right_new)

Result of a window join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
join_result = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2))
isinstance(join_result, pw.temporal.WindowJoinResult)
True
pw.debug.compute_and_print(
    join_result.select(left_t=t1.t, right_t=t2.t), include_id=False
)
left_t | right_t
       | 5
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |

select(*args, **kwargs)

Computes a result of a window join. :type args: [`ColumnReference`](/developers/api-docs/pathway/#pathway.ColumnReference) :param args: Column references. :type kwargs: `Any` :param kwargs: Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw t1 = pw.debug.table_from_markdown( … ‘’’ … | a | t … 1 | 1 | 1 … 2 | 1 | 2 … 3 | 1 | 3 … 4 | 1 | 7 … 5 | 1 | 13 … 6 | 2 | 1 … 7 | 2 | 2 … 8 | 3 | 4 … ‘’’ … ) t2 = pw.debug.table_from_markdown( … ‘’’ … | b | t … 1 | 1 | 2 … 2 | 1 | 5 … 3 | 1 | 6 … 4 | 1 | 7 … 5 | 2 | 2 … 6 | 2 | 3 … 7 | 4 | 3 … ‘’’ … ) t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select( … key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t … ) pw.debug.compute_and_print(t3, include_id=False) key | left_t | right_t 1 | | 5 1 | 1 | 1 | 2 | 2 1 | 3 | 2 1 | 7 | 6 1 | 7 | 7 1 | 13 | 2 | 1 | 2 | 2 | 2 2 | 2 | 3 3 | 4 | 4 | | 3

Functions

pw.temporal.asof_join(self, other, self_time, other_time, *on, how, defaults={}, direction=Direction.BACKWARD)

Perform an ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

pw.temporal.asof_join_left(self, other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform a left ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

pw.temporal.asof_join_outer(self, other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform an outer ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 2  | 1        | 0         | 1
0         | 3  | 1        | 6         | 7
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 2         | 7
0         | 7  | 5        | 6         | 11
0         | 8  | 5        | 3         | 8
0         | 9  | 5        | 9         | 14
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
0         | 13 | 7        | 7         | 14
0         | 14 | 7        | 4         | 11
1         | 2  | -1       | 7         | 6
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16
1         | 8  | 9        | 3         | 12

pw.temporal.asof_join_right(self, other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform a right ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 2  | 1        | 0         | 1
0         | 3  | 1        | 6         | 7
0         | 7  | 5        | 2         | 7
0         | 8  | 5        | 3         | 8
0         | 9  | 5        | 9         | 14
0         | 13 | 7        | 7         | 14
0         | 14 | 7        | 4         | 11
1         | 2  | -1       | 7         | 6
1         | 8  | 9        | 3         | 12

pw.temporal.interval(lower_bound, upper_bound)

Allows testing whether two times are within a certain distance.

NOTE: Usually used as an argument of .interval_join().

  • Parameters
    • lower_bound (Union[int, float, timedelta]) – a lower bound on other_time - self_time.
    • upper_bound (Union[int, float, timedelta]) – an upper bound on other_time - self_time.
  • Returns
    Window – object to pass as an argument to .interval_join()

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4

pw.temporal.interval_join(self, other, self_time, other_time, interval, *on, how=JoinMode.INNER)

Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.
  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

pw.temporal.interval_join_inner(self, other, self_time, other_time, interval, *on)

Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.
  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

pw.temporal.interval_join_left(self, other, self_time, other_time, interval, *on)

Performs an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

pw.temporal.interval_join_outer(self, other, self_time, other_time, interval, *on)

Performs an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

pw.temporal.interval_join_right(self, other, self_time, other_time, interval, *on)

Performs an interval right join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_right(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

pw.temporal.intervals_over(*, at, lower_bound, upper_bound)

Allows grouping together elements within a window.

Windows are created for each time t in at, by taking values with times within [t+lower_bound, t+upper_bound].

  • Parameters
    • lower_bound (Union[int, float, timedelta]) – lower bound for interval
    • upper_bound (Union[int, float, timedelta]) – upper bound for interval
    • at (ColumnReference) – column of times for which windows are to be created
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | t |  v
1   | 1 |  10
2   | 2 |  1
3   | 4 |  3
4   | 8 |  2
5   | 9 |  4
6   | 10|  8
7   | 1 |  9
8   | 2 |  16
''')
probes = pw.debug.table_from_markdown(
'''
t
2
4
6
8
10
''')
result = (
    pw.temporal.windowby(t, t.t, window=pw.temporal.intervals_over(
        at=probes.t, lower_bound=-2, upper_bound=1
     ))
    .reduce(pw.this._pw_window_location, v=pw.reducers.tuple(pw.this.v))
)
pw.debug.compute_and_print(result, include_id=False)
_pw_window_location | v
2                   | (16, 1, 10, 9)
4                   | (3, 16, 1)
6                   | (3,)
8                   | (4, 2)
10                  | (4, 8, 2)

pw.temporal.session(*, predicate=None, max_gap=None)

Allows grouping together elements within a window across ordered time-like data column by locally grouping adjacent elements either based on a maximum time difference or using a custom predicate.

NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments predicate or max_gap should be provided.

  • Parameters
    • predicate (Optional[Callable[[Any, Any], bool]]) – function taking two adjacent entries that returns a boolean saying whether the two entries should be grouped
    • max_gap (Union[int, float, timedelta, None]) – Two adjacent entries will be grouped if b - a < max_gap
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | shard |  t |  v
1   | 0     |  1 |  10
2   | 0     |  2 |  1
3   | 0     |  4 |  3
4   | 0     |  8 |  2
5   | 0     |  9 |  4
6   | 0     |  10|  8
7   | 1     |  1 |  9
8   | 1     |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this._pw_shard,
pw.this._pw_window_start,
pw.this._pw_window_end,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
_pw_shard | _pw_window_start | _pw_window_end | min_t | max_v | count
0         | 1                | 2              | 1     | 10    | 2
0         | 4                | 4              | 4     | 3     | 1
0         | 8                | 10             | 8     | 8     | 3
1         | 1                | 2              | 1     | 16    | 2

pw.temporal.sliding(hop, duration=None, ratio=None, offset=None)

Allows grouping together elements within a window of a given length sliding across ordered time-like data column according to a specified interval (hop) starting from a given offset.

NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments hop or ratio should be provided.

  • Parameters
    • hop (Union[int, float, timedelta]) – frequency of a window
    • duration (Union[int, float, timedelta, None]) – length of the window
    • ratio (Optional[int]) – used as an alternative way to specify duration as hop * ratio
    • offset (Union[int, float, datetime, None]) – beginning of the first window
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
       | shard | t
   1   | 0     |  12
   2   | 0     |  13
   3   | 0     |  14
   4   | 0     |  15
   5   | 0     |  16
   6   | 0     |  17
   7   | 1     |  10
   8   | 1     |  11
''')
result = t.windowby(
    t.t, window=pw.temporal.sliding(duration=10, hop=3), shard=t.shard
).reduce(
  pw.this._pw_shard,
  pw.this._pw_window_start,
  pw.this._pw_window_end,
  min_t=pw.reducers.min(pw.this.t),
  max_t=pw.reducers.max(pw.this.t),
  count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
_pw_shard | _pw_window_start | _pw_window_end | min_t | max_t | count
0         | 3                | 13             | 12    | 12    | 1
0         | 6                | 16             | 12    | 15    | 4
0         | 9                | 19             | 12    | 17    | 6
0         | 12               | 22             | 12    | 17    | 6
0         | 15               | 25             | 15    | 17    | 3
1         | 3                | 13             | 10    | 11    | 2
1         | 6                | 16             | 10    | 11    | 2
1         | 9                | 19             | 10    | 11    | 2

pw.temporal.tumbling(duration, offset=None)

Allows grouping together elements within a window of a given length tumbling across ordered time-like data column starting from a given offset.

NOTE: Usually used as an argument of .windowby().

  • Parameters
    • duration (Union[int, float, timedelta]) – length of the window
    • offset (Union[int, float, datetime, None]) – beginning of the first window
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
       | shard | t
   1   | 0     |  12
   2   | 0     |  13
   3   | 0     |  14
   4   | 0     |  15
   5   | 0     |  16
   6   | 0     |  17
   7   | 1     |  12
   8   | 1     |  13
''')
result = t.windowby(
    t.t, window=pw.temporal.tumbling(duration=5), shard=t.shard
).reduce(
  pw.this._pw_shard,
  pw.this._pw_window_start,
  pw.this._pw_window_end,
  min_t=pw.reducers.min(pw.this.t),
  max_t=pw.reducers.max(pw.this.t),
  count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
_pw_shard | _pw_window_start | _pw_window_end | min_t | max_t | count
0         | 10               | 15             | 12    | 14    | 3
0         | 15               | 20             | 15    | 17    | 3
1         | 10               | 15             | 12    | 13    | 2

pw.temporal.window_join(self, other, self_time, other_time, window, *on, how=JoinMode.INNER)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run window_join_inner, window_join_left, window_join_right or window_join_outer. Default is INNER.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

pw.temporal.window_join_inner(self, other, self_time, other_time, window, *on)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

pw.temporal.window_join_left(self, other, self_time, other_time, window, *on)

Performs a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |

pw.temporal.window_join_outer(self, other, self_time, other_time, window, *on)

Performs a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | -3
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   |        | 10
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |
4   |        | 3

pw.temporal.window_join_right(self, other, self_time, other_time, window, *on)

Performs a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t2.b, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | -3
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t2.b, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   |        | 10
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
4   |        | 3

pw.temporal.windowby(self, time_expr, *, window, shard=None)

Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard
  • Parameters
    • time_expr (ColumnExpression) – Column expression used for windowing
    • window (Window) – type window to use
    • shard (Optional[ColumnExpression]) – optional column expression to act as a shard key

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | shard |  t |  v
1   | 0     |  1 |  10
2   | 0     |  2 |  1
3   | 0     |  4 |  3
4   | 0     |  8 |  2
5   | 0     |  9 |  4
6   | 0     |  10|  8
7   | 1     |  1 |  9
8   | 1     |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this.shard,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
shard | min_t | max_v | count
0     | 1     | 10    | 2
0     | 4     | 3     | 1
0     | 8     | 8     | 3
1     | 1     | 16    | 2

Submodules

pathway.stdlib.temporal.utils module

pw.temporal.utils.check_joint_types(parameters)

Checks if all parameters have types that allow to execute a function. If parameters are {‘a’: (a, TimeEventType), ‘b’: (b, IntervalType)} then the following pairs of types are allowed for (a, b): (int, int), (float, float), (datetime.datetime, datetime.timedelta)