TEST

pathway.stdlib.temporal package

class pw.temporal.AsofJoinResult(side_data, mode, defaults, direction, )

[source]

Result of an ASOF join of two tables

Example:

import pathway as pw

t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)

t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

class pw.temporal.AsofNowJoinResult(original_left, left, right, join_result, table_substitution, mode, id)

[source]

Result of an asof now join between tables.

select(*args, **kwargs)

sourceComputes a result of an asof now join.

  • Parameters
  • Returns
    Table – Created table.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

class pw.temporal.CommonBehavior(delay, cutoff, keep_results)

[source]

Defines temporal behavior of windows and temporal joins.

class pw.temporal.Direction(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

[source]

class pw.temporal.ExactlyOnceBehavior(shift)

[source]

class pw.temporal.Interval(lower_bound, upper_bound)

[source]

class pw.temporal.IntervalJoinResult(left, right, table_substitution, )

[source]

Result of an interval join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
join_result = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1))
isinstance(join_result, pw.temporal.IntervalJoinResult)
pw.debug.compute_and_print(
    join_result.select(left_t=t1.t, right_t=t2.t), include_id=False
)

abstract select(*args, **kwargs)

sourceComputes a result of an interval join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

class pw.temporal.Window

[source]

class pw.temporal.WindowJoinResult(join_result, left_original, right_original, left_new, right_new)

[source]

Result of a window join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
join_result = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2))
isinstance(join_result, pw.temporal.WindowJoinResult)
pw.debug.compute_and_print(
    join_result.select(left_t=t1.t, right_t=t2.t), include_id=False
)

select(*args, **kwargs)

sourceComputes a result of a window join. args: ColumnReference args: Column references. kwargs: Any kwargs: Column expressions with their new assigned names.

  • Returns
    Table – Created table.

Example:

import pathway as pw t1 = pw.debug.table_from_markdown( … ‘’’ … | a | t … 1 | 1 | 1 … 2 | 1 | 2 … 3 | 1 | 3 … 4 | 1 | 7 … 5 | 1 | 13 … 6 | 2 | 1 … 7 | 2 | 2 … 8 | 3 | 4 … ‘’’ … ) t2 = pw.debug.table_from_markdown( … ‘’’ … | b | t … 1 | 1 | 2 … 2 | 1 | 5 … 3 | 1 | 6 … 4 | 1 | 7 … 5 | 2 | 2 … 6 | 2 | 3 … 7 | 4 | 3 … ‘’’ … ) t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select( … key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t … ) pw.debug.compute_and_print(t3, include_id=False) key | left_t | right_t 1 | | 5 1 | 1 | 1 | 2 | 2 1 | 3 | 2 1 | 7 | 6 1 | 7 | 7 1 | 13 | 2 | 1 | 2 | 2 | 2 2 | 2 | 3 3 | 4 | 4 | | 3

pw.temporal.asof_join(self, other, self_time, other_time, *on, how, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join(
    t2, t1.event_time, t2.event_time, how=pw.JoinMode.LEFT
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

result_without_cutoff = t1.asof_join(
    t2,
    t1.event_time,
    t2.event_time,
    how=pw.JoinMode.LEFT,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

pw.temporal.asof_join_left(self, other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a left ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join_left(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

result_without_cutoff = t1.asof_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

pw.temporal.asof_join_outer(self, other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an outer ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

pw.temporal.asof_join_right(self, other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a right ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
result_without_cutoff = t1.asof_join_right(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_without_cutoff = t1.asof_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

The record with value=4 from table t2 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

pw.temporal.asof_now_join(self, other, *on, how=JoinMode.INNER, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT} which correspond to inner and left join respectively.
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join(
    data, pw.left.instance == pw.right.instance, how=pw.JoinMode.LEFT
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

pw.temporal.asof_now_join_inner(self, other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_inner(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

pw.temporal.asof_now_join_left(self, other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. If there are no matching rows in other, missing values on the right side are replaced with None. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_left(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

pw.temporal.common_behavior(delay=None, cutoff=None, keep_results=True)

sourceCreates an instance of CommonBehavior, which contains a basic configuration of a behavior of temporal operators (like windowby or asof_join). Each temporal operator tracks its own time (defined as a maximum time that arrived to the operator) and this configuration tells it that some of its inputs or outputs may be delayed or ignored. The decisions are based on the current time of the operator and the time associated with an input/output entry. Additionally, it allows the operator to free up memory by removing parts of internal state that cannot interact with any future input entries.

Remark: for the sake of temporal behavior, the current time of each operator is updated only after it processes all the data that arrived on input. In other words, if several new input entries arrived to the system simultaneously, each of those entries will be processed using last recorded time, and the recorded time is upda

  • Parameters
    • delay (Union[int, float, timedelta, None]) – Optional.
      For windows, delays initial output by delay with respect to the beginning of the window. Setting it to None does not enable delaying mechanism.
      For interval joins and asof joins, it delays the time the record is joined by delay.
      Using delay is useful when updates are too frequent.
    • cutoff (Union[int, float, timedelta, None]) – Optional.
      For windows, stops updating windows which end earlier than maximal seen time minus cutoff. Setting cutoff to None does not enable cutoff mechanism.
      For interval joins and asof joins, it ignores entries that are older than maximal seen time minus cutoff. This parameter is also used to clear memory. It allows to release memory used by entries that won’t change.
    • keep_results (bool) – If set to True, keeps all results of the operator. If set to False, keeps only results that are newer than maximal seen time minus cutoff. Can’t be set to False, when cutoff is None.

pw.temporal.exactly_once_behavior(shift=None)

sourceCreates an instance of class ExactlyOnceBehavior, indicating that each non empty window should produce exactly one output.

  • Parameters
    shift (Union[int, float, timedelta, None]) – optional, defines the moment in time (window end + shift) in which the window stops accepting the data and sends the results to the output. Setting it to None is interpreted as shift=0.

Remark:

note that setting a non-zero shift and demanding exactly one output results in
the output being delivered only when the time in the time column reaches
`window end + shift`.

pw.temporal.inactivity_detection(event_time_column, allowed_inactivity_period, refresh_rate=pw.Duration(seconds=1), instance=None)

sourceDetects periods of inactivity in a given table and identifies when activity resumes.

This function monitors a stream of events defined by a timestamp column and detects inactivity periods that exceed a specified threshold. Additionally, it identifies the first event that resumes activity after each period of inactivity.

Note: Assumes that the ingested timestamps (event_time_column) follow current UTC time and that the latency of the system is negligible compared to the allowed_inactivity_period.

  • Parameters
    • event_time_column (ColumnReference) – A reference to the column containing UTC timestamps of events in the monitored table.
    • allowed_inactivity_period (Duration) – The maximum allowed period of inactivity. If no events occur within this duration, an inactivity period is flagged.
    • refresh_rate (Duration) – The frequency at which the current time is refreshed for inactivity detection. Defaults to 1 second.
    • instance (ColumnReference | None) – The inactivity periods are computed separately per each instance value
  • Returns
    Tuple of tables
    • inactivities (pw.Table): A table containing timestamps (inactive_t) where periods of inactivity begin (i.e., the last timestamp before inactivity was detected).
    • resumed_activities (pw.Table): A table containing the earliest timestamps (resumed_t) of resumed activity following each period of inactivity.

pw.temporal.interval(lower_bound, upper_bound)

sourceAllows testing whether two times are within a certain distance.

NOTE: Usually used as an argument of .interval_join().

  • Parameters
    • lower_bound (int | float | timedelta) – a lower bound on other_time - self_time.
    • upper_bound (int | float | timedelta) – an upper bound on other_time - self_time.
  • Returns
    Window – object to pass as an argument to .interval_join()

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)

pw.temporal.interval_join(self, other, self_time, other_time, interval, *on, behavior=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourcePerforms an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (pw.ColumnExpression[int | float | datetime]) – time expression in self.
    • other_time (pw.ColumnExpression[int | float | datetime]) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines a temporal behavior of a join - features like delaying entries or ignoring late entries. You can see examples below or read more in the temporal behavior of interval join tutorial .
    • how (JoinMode) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

pw.temporal.interval_join_inner(self, other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

pw.temporal.interval_join_left(self, other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

pw.temporal.interval_join_outer(self, other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_outer(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join_outer(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

pw.temporal.interval_join_right(self, other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval right join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_right(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

pw.temporal.intervals_over(*, at, lower_bound, upper_bound, is_outer=True)

sourceAllows grouping together elements within a window.

Windows are created for each time t in at, by taking values with times within [t+lower_bound, t+upper_bound].

Note: If a tuple reducer will be used on grouped elements within a window, values in the tuple will be sorted according to their time column.

  • Parameters
    • lower_bound (int | float | timedelta) – lower bound for interval
    • upper_bound (int | float | timedelta) – upper bound for interval
    • at (ColumnReference) – column of times for which windows are to be created
    • is_outer (bool) – decides whether empty windows should return None or be omitted
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | t |  v
1   | 1 |  10
2   | 2 |  1
3   | 4 |  3
4   | 8 |  2
5   | 9 |  4
6   | 10|  8
7   | 1 |  9
8   | 2 |  16
''')
probes = pw.debug.table_from_markdown(
'''
t
2
4
6
8
10
''')
result = (
    pw.temporal.windowby(t, t.t, window=pw.temporal.intervals_over(
        at=probes.t, lower_bound=-2, upper_bound=1
     ))
    .reduce(pw.this._pw_window_location, v=pw.reducers.sorted_tuple(pw.this.v))
)
pw.debug.compute_and_print(result, include_id=False)

pw.temporal.session(*, predicate=None, max_gap=None)

sourceAllows grouping together elements within a window across ordered time-like data column by locally grouping adjacent elements either based on a maximum time difference or using a custom predicate.

NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments predicate or max_gap should be provided.

  • Parameters
    • predicate (Callable[[Any, Any], bool] | None) – function taking two adjacent entries that returns a boolean saying whether the two entries should be grouped
    • max_gap (int | float | timedelta | None) – Two adjacent entries will be grouped if b - a < max_gap
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | instance |  t |  v
1   | 0        |  1 |  10
2   | 0        |  2 |  1
3   | 0        |  4 |  3
4   | 0        |  8 |  2
5   | 0        |  9 |  4
6   | 0        |  10|  8
7   | 1        |  1 |  9
8   | 1        |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), instance=t.instance
).reduce(
pw.this._pw_instance,
pw.this._pw_window_start,
pw.this._pw_window_end,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)

pw.temporal.sliding(hop, duration=None, ratio=None, origin=None)

sourceAllows grouping together elements within a window of a given length sliding across ordered time-like data column according to a specified interval (hop) starting from a given origin.

NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments hop or ratio should be provided.

  • Parameters
    • hop (int | float | timedelta) – frequency of a window
    • duration (int | float | timedelta | None) – length of the window
    • ratio (int | None) – used as an alternative way to specify duration as hop * ratio
    • origin (int | float | datetime | None) – a point in time at which the first window begins
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
       | instance | t
   1   | 0        |  12
   2   | 0        |  13
   3   | 0        |  14
   4   | 0        |  15
   5   | 0        |  16
   6   | 0        |  17
   7   | 1        |  10
   8   | 1        |  11
''')
result = t.windowby(
    t.t, window=pw.temporal.sliding(duration=10, hop=3), instance=t.instance
).reduce(
  pw.this._pw_instance,
  pw.this._pw_window_start,
  pw.this._pw_window_end,
  min_t=pw.reducers.min(pw.this.t),
  max_t=pw.reducers.max(pw.this.t),
  count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)

pw.temporal.tumbling(duration, origin=None)

sourceAllows grouping together elements within a window of a given length tumbling across ordered time-like data column starting from a given origin.

NOTE: Usually used as an argument of .windowby().

  • Parameters
    • duration (int | float | timedelta) – length of the window
    • origin (int | float | datetime | None) – a point in time at which the first window begins
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
       | instance | t
   1   | 0        |  12
   2   | 0        |  13
   3   | 0        |  14
   4   | 0        |  15
   5   | 0        |  16
   6   | 0        |  17
   7   | 1        |  12
   8   | 1        |  13
''')
result = t.windowby(
    t.t, window=pw.temporal.tumbling(duration=5), instance=t.instance
).reduce(
  pw.this._pw_instance,
  pw.this._pw_window_start,
  pw.this._pw_window_end,
  min_t=pw.reducers.min(pw.this.t),
  max_t=pw.reducers.max(pw.this.t),
  count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)

pw.temporal.utc_now(refresh_rate=datetime.timedelta(seconds=60))

sourceProvides a continuously updating stream of the current UTC time.

This function generates a real-time feed of the current UTC timestamp, refreshing at a specified interval.

  • Parameters
    refresh_rate (timedelta) – The interval at which the current UTC time is refreshed. Defaults to 60 seconds.
  • Returns
    A table containing a stream of the current UTC timestamps, updated according to the specified refresh rate.

pw.temporal.window_join(self, other, self_time, other_time, window, *on, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourcePerforms a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run window_join_inner, window_join_left, window_join_right or window_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

pw.temporal.window_join_inner(self, other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

pw.temporal.window_join_left(self, other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

pw.temporal.window_join_outer(self, other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

pw.temporal.window_join_right(self, other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t2.b, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t2.b, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

pw.temporal.windowby(self, time_expr, *, window, behavior=None, instance=None)

sourceCreate a GroupedTable by windowing the table (based on expr and window), optionally with instance argument.

  • Parameters
    • time_expr (pw.ColumnExpression[int | float | datetime]) – Column expression used for windowing
    • window (Window) – type window to use
    • instance (ColumnExpression | None) – optional column expression to act as a shard key

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | instance |  t |  v
1   | 0        |  1 |  10
2   | 0        |  2 |  1
3   | 0        |  4 |  3
4   | 0        |  8 |  2
5   | 0        |  9 |  4
6   | 0        |  10|  8
7   | 1        |  1 |  9
8   | 1        |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), instance=t.instance
).reduce(
pw.this.instance,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)

Submodules

pathway.stdlib.temporal.temporal_behavior module

class pw.temporal.temporal_behavior.Behavior

[source]

A superclass of all classes defining temporal behavior: its subclasses allow to configure several temporal operators to delay outputs, ignore late entries, and clean the memory.

class pw.temporal.temporal_behavior.CommonBehavior(delay, cutoff, keep_results)

[source]

Defines temporal behavior of windows and temporal joins.

class pw.temporal.temporal_behavior.ExactlyOnceBehavior(shift)

[source]

pw.temporal.temporal_behavior.common_behavior(delay=None, cutoff=None, keep_results=True)

sourceCreates an instance of CommonBehavior, which contains a basic configuration of a behavior of temporal operators (like windowby or asof_join). Each temporal operator tracks its own time (defined as a maximum time that arrived to the operator) and this configuration tells it that some of its inputs or outputs may be delayed or ignored. The decisions are based on the current time of the operator and the time associated with an input/output entry. Additionally, it allows the operator to free up memory by removing parts of internal state that cannot interact with any future input entries.

Remark: for the sake of temporal behavior, the current time of each operator is updated only after it processes all the data that arrived on input. In other words, if several new input entries arrived to the system simultaneously, each of those entries will be processed using last recorded time, and the recorded time is upda

  • Parameters
    • delay (Union[int, float, timedelta, None]) – Optional.
      For windows, delays initial output by delay with respect to the beginning of the window. Setting it to None does not enable delaying mechanism.
      For interval joins and asof joins, it delays the time the record is joined by delay.
      Using delay is useful when updates are too frequent.
    • cutoff (Union[int, float, timedelta, None]) – Optional.
      For windows, stops updating windows which end earlier than maximal seen time minus cutoff. Setting cutoff to None does not enable cutoff mechanism.
      For interval joins and asof joins, it ignores entries that are older than maximal seen time minus cutoff. This parameter is also used to clear memory. It allows to release memory used by entries that won’t change.
    • keep_results (bool) – If set to True, keeps all results of the operator. If set to False, keeps only results that are newer than maximal seen time minus cutoff. Can’t be set to False, when cutoff is None.

pw.temporal.temporal_behavior.exactly_once_behavior(shift=None)

sourceCreates an instance of class ExactlyOnceBehavior, indicating that each non empty window should produce exactly one output.

  • Parameters
    shift (Union[int, float, timedelta, None]) – optional, defines the moment in time (window end + shift) in which the window stops accepting the data and sends the results to the output. Setting it to None is interpreted as shift=0.

Remark:

note that setting a non-zero shift and demanding exactly one output results in
the output being delivered only when the time in the time column reaches
`window end + shift`.

pathway.stdlib.temporal.time_utils module

class pw.temporal.time_utils.TimestampSchema

[source]

class pw.temporal.time_utils.TimestampSubject(refresh_rate)

[source]

close()

sourceSends a sentinel message.

Should be called to indicate that no new messages will be sent.

commit()

sourceSends a commit message.

end()

sourceJoins a thread running run().

Should not be called directly.

next(**kwargs)

sourceSends a message to the enigne.

The arguments should be compatible with the schema passed to read(). Values for all fields should be passed to this method unless they have a default value specified in the schema.

Example:

import pathway as pw
import pandas as pd

class InputSchema(pw.Schema):
    a: pw.DateTimeNaive
    b: bytes
    c: int

class InputSubject(pw.io.python.ConnectorSubject):
    def run(self):
        self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
        self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)

t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)

next_bytes(message)

sourceSends a message.

  • Parameters
    message (bytes) – a message represented as bytes.

next_json(message)

sourceSends a message.

  • Parameters
    message (dict) – Dict representing json.

next_str(message)

sourceSends a message.

  • Parameters
    message (str) – a message represented as a string.

on_persisted_run()

sourceThis method is called by Rust core to notify that the state will be persisted in this run.

on_stop()

sourceCalled after the end of the run() function.

seek(state)

sourceCalled by Rust core on start to resume reading from the last stopping point.

start()

sourceRuns a separate thread with function feeding data into buffer.

Should not be called directly.

pw.temporal.time_utils.inactivity_detection(event_time_column, allowed_inactivity_period, refresh_rate=pw.Duration(seconds=1), instance=None)

sourceDetects periods of inactivity in a given table and identifies when activity resumes.

This function monitors a stream of events defined by a timestamp column and detects inactivity periods that exceed a specified threshold. Additionally, it identifies the first event that resumes activity after each period of inactivity.

Note: Assumes that the ingested timestamps (event_time_column) follow current UTC time and that the latency of the system is negligible compared to the allowed_inactivity_period.

  • Parameters
    • event_time_column (ColumnReference) – A reference to the column containing UTC timestamps of events in the monitored table.
    • allowed_inactivity_period (Duration) – The maximum allowed period of inactivity. If no events occur within this duration, an inactivity period is flagged.
    • refresh_rate (Duration) – The frequency at which the current time is refreshed for inactivity detection. Defaults to 1 second.
    • instance (ColumnReference | None) – The inactivity periods are computed separately per each instance value
  • Returns
    Tuple of tables
    • inactivities (pw.Table): A table containing timestamps (inactive_t) where periods of inactivity begin (i.e., the last timestamp before inactivity was detected).
    • resumed_activities (pw.Table): A table containing the earliest timestamps (resumed_t) of resumed activity following each period of inactivity.

pw.temporal.time_utils.utc_now(refresh_rate=datetime.timedelta(seconds=60))

sourceProvides a continuously updating stream of the current UTC time.

This function generates a real-time feed of the current UTC timestamp, refreshing at a specified interval.

  • Parameters
    refresh_rate (timedelta) – The interval at which the current UTC time is refreshed. Defaults to 60 seconds.
  • Returns
    A table containing a stream of the current UTC timestamps, updated according to the specified refresh rate.

pathway.stdlib.temporal.utils module

pw.temporal.utils.check_joint_types(parameters)

sourceChecks if all parameters have types that allow to execute a function. If parameters are {‘a’: (a, TimeEventType), ‘b’: (b, IntervalType)} then the following pairs of types are allowed for (a, b): (int, int), (float, float), (datetime.datetime, datetime.timedelta)