Temporal

This section covers a suite of temporal helper functions, allowing you to establish temporal relationships, group data in time intervals, and create custom time-bound sessions, all with a variety of options for customization and flexibility. Examples of each function in use are provided to help you better understand their applications.

Functions

pw.temporal.interval(lower_bound, upper_bound)

Allows testing whether two times are within a certain distance.

NOTE: Usually used as an argument of .interval_join().

  • Parameters
    • lower_bound (Union[int, float, timedelta]) – a lower bound on other_time - self_time.
    • upper_bound (Union[int, float, timedelta]) – an upper bound on other_time - self_time.
  • Returns
    Window – object to pass as an argument to .interval_join()

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4

pw.temporal.intervals_over(*, at, lower_bound, upper_bound)

Allows grouping together elements within a window.

Windows are created for each time t in at, by taking values with times within [t+lower_bound, t+upper_bound].

  • Parameters
    • lower_bound (Union[int, float, timedelta]) – lower bound for interval
    • upper_bound (Union[int, float, timedelta]) – upper bound for interval
    • at (ColumnReference) – column of times for which windows are to be created
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | t |  v
1   | 1 |  10
2   | 2 |  1
3   | 4 |  3
4   | 8 |  2
5   | 9 |  4
6   | 10|  8
7   | 1 |  9
8   | 2 |  16
''')
probes = pw.debug.table_from_markdown(
'''
t
2
4
6
8
10
''')
result = (
    pw.temporal.windowby(t, t.t, window=pw.temporal.intervals_over(
        at=probes.t, lower_bound=-2, upper_bound=1
     ))
    .reduce(pw.this._pw_window_location, v=pw.reducers.tuple(pw.this.v))
)
pw.debug.compute_and_print(result, include_id=False)
_pw_window_location | v
2                   | (16, 1, 10, 9)
4                   | (3, 16, 1)
6                   | (3,)
8                   | (4, 2)
10                  | (4, 8, 2)

pw.temporal.session(*, predicate=None, max_gap=None)

Allows grouping together elements within a window across ordered time-like data column by locally grouping adjacent elements either based on a maximum time difference or using a custom predicate.

NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments predicate or max_gap should be provided.

  • Parameters
    • predicate (Optional[Callable[[Any, Any], bool]]) – function taking two adjacent entries that returns a boolean saying whether the two entries should be grouped
    • max_gap (Union[int, float, timedelta, None]) – Two adjacent entries will be grouped if b - a < max_gap
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | shard |  t |  v
1   | 0     |  1 |  10
2   | 0     |  2 |  1
3   | 0     |  4 |  3
4   | 0     |  8 |  2
5   | 0     |  9 |  4
6   | 0     |  10|  8
7   | 1     |  1 |  9
8   | 1     |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this._pw_shard,
pw.this._pw_window_start,
pw.this._pw_window_end,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
_pw_shard | _pw_window_start | _pw_window_end | min_t | max_v | count
0         | 1                | 2              | 1     | 10    | 2
0         | 4                | 4              | 4     | 3     | 1
0         | 8                | 10             | 8     | 8     | 3
1         | 1                | 2              | 1     | 16    | 2

pw.temporal.sliding(hop, duration=None, ratio=None, offset=None)

Allows grouping together elements within a window of a given length sliding across ordered time-like data column according to a specified interval (hop) starting from a given offset.

NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments hop or ratio should be provided.

  • Parameters
    • hop (Union[int, float, timedelta]) – frequency of a window
    • duration (Union[int, float, timedelta, None]) – length of the window
    • ratio (Optional[int]) – used as an alternative way to specify duration as hop * ratio
    • offset (Union[int, float, datetime, None]) – beginning of the first window
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
       | shard | t
   1   | 0     |  12
   2   | 0     |  13
   3   | 0     |  14
   4   | 0     |  15
   5   | 0     |  16
   6   | 0     |  17
   7   | 1     |  10
   8   | 1     |  11
''')
result = t.windowby(
    t.t, window=pw.temporal.sliding(duration=10, hop=3), shard=t.shard
).reduce(
  pw.this._pw_shard,
  pw.this._pw_window_start,
  pw.this._pw_window_end,
  min_t=pw.reducers.min(pw.this.t),
  max_t=pw.reducers.max(pw.this.t),
  count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
_pw_shard | _pw_window_start | _pw_window_end | min_t | max_t | count
0         | 3                | 13             | 12    | 12    | 1
0         | 6                | 16             | 12    | 15    | 4
0         | 9                | 19             | 12    | 17    | 6
0         | 12               | 22             | 12    | 17    | 6
0         | 15               | 25             | 15    | 17    | 3
1         | 3                | 13             | 10    | 11    | 2
1         | 6                | 16             | 10    | 11    | 2
1         | 9                | 19             | 10    | 11    | 2

pw.temporal.tumbling(duration, offset=None)

Allows grouping together elements within a window of a given length tumbling across ordered time-like data column starting from a given offset.

NOTE: Usually used as an argument of .windowby().

  • Parameters
    • duration (Union[int, float, timedelta]) – length of the window
    • offset (Union[int, float, datetime, None]) – beginning of the first window
  • Returns
    Window – object to pass as an argument to .windowby()

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
       | shard | t
   1   | 0     |  12
   2   | 0     |  13
   3   | 0     |  14
   4   | 0     |  15
   5   | 0     |  16
   6   | 0     |  17
   7   | 1     |  12
   8   | 1     |  13
''')
result = t.windowby(
    t.t, window=pw.temporal.tumbling(duration=5), shard=t.shard
).reduce(
  pw.this._pw_shard,
  pw.this._pw_window_start,
  pw.this._pw_window_end,
  min_t=pw.reducers.min(pw.this.t),
  max_t=pw.reducers.max(pw.this.t),
  count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
_pw_shard | _pw_window_start | _pw_window_end | min_t | max_t | count
0         | 10               | 15             | 12    | 14    | 3
0         | 15               | 20             | 15    | 17    | 3
1         | 10               | 15             | 12    | 13    | 2

pw.temporal.windowby(self, time_expr, *, window, shard=None)

Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard
  • Parameters
    • time_expr (ColumnExpression) – Column expression used for windowing
    • window (Window) – type window to use
    • shard (Optional[ColumnExpression]) – optional column expression to act as a shard key

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | shard |  t |  v
1   | 0     |  1 |  10
2   | 0     |  2 |  1
3   | 0     |  4 |  3
4   | 0     |  8 |  2
5   | 0     |  9 |  4
6   | 0     |  10|  8
7   | 1     |  1 |  9
8   | 1     |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this.shard,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
shard | min_t | max_v | count
0     | 1     | 10    | 2
0     | 4     | 3     | 1
0     | 8     | 8     | 3
1     | 1     | 16    | 2