Temporal
This section covers a suite of temporal helper functions, allowing you to establish temporal relationships, group data in time intervals, and create custom time-bound sessions, all with a variety of options for customization and flexibility. Examples of each function in use are provided to help you better understand their applications.
Functions
pw.temporal.interval(lower_bound, upper_bound)
Allows testing whether two times are within a certain distance.NOTE: Usually used as an argument of .interval_join().
- Parameters
- lower_bound (
Union
[int
,float
,timedelta
]) – a lower bound on other_time - self_time. - upper_bound (
Union
[int
,float
,timedelta
]) – an upper bound on other_time - self_time.
- lower_bound (
- Returns
Window – object to pass as an argument to .interval_join()
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
pw.temporal.intervals_over(*, at, lower_bound, upper_bound)
Allows grouping together elements within a window.Windows are created for each time t in at, by taking values with times within [t+lower_bound, t+upper_bound].
- Parameters
- lower_bound (
Union
[int
,float
,timedelta
]) – lower bound for interval - upper_bound (
Union
[int
,float
,timedelta
]) – upper bound for interval - at (
ColumnReference
) – column of times for which windows are to be created
- lower_bound (
- Returns
Window – object to pass as an argument to .windowby()
Examples:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
| t | v
1 | 1 | 10
2 | 2 | 1
3 | 4 | 3
4 | 8 | 2
5 | 9 | 4
6 | 10| 8
7 | 1 | 9
8 | 2 | 16
''')
probes = pw.debug.table_from_markdown(
'''
t
2
4
6
8
10
''')
result = (
pw.temporal.windowby(t, t.t, window=pw.temporal.intervals_over(
at=probes.t, lower_bound=-2, upper_bound=1
))
.reduce(pw.this._pw_window_location, v=pw.reducers.tuple(pw.this.v))
)
pw.debug.compute_and_print(result, include_id=False)
pw.temporal.session(*, predicate=None, max_gap=None)
Allows grouping together elements within a window across ordered time-like data column by locally grouping adjacent elements either based on a maximum time difference or using a custom predicate.NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments predicate or max_gap should be provided.
- Parameters
- predicate (
Optional
[Callable
[[Any
,Any
],bool
]]) – function taking two adjacent entries that returns a boolean saying whether the two entries should be grouped - max_gap (
Union
[int
,float
,timedelta
,None
]) – Two adjacent entries will be grouped if b - a < max_gap
- predicate (
- Returns
Window – object to pass as an argument to .windowby()
Examples:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
| shard | t | v
1 | 0 | 1 | 10
2 | 0 | 2 | 1
3 | 0 | 4 | 3
4 | 0 | 8 | 2
5 | 0 | 9 | 4
6 | 0 | 10| 8
7 | 1 | 1 | 9
8 | 1 | 2 | 16
''')
result = t.windowby(
t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this._pw_shard,
pw.this._pw_window_start,
pw.this._pw_window_end,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
pw.temporal.sliding(hop, duration=None, ratio=None, offset=None)
Allows grouping together elements within a window of a given length sliding across ordered time-like data column according to a specified interval (hop) starting from a given offset.NOTE: Usually used as an argument of .windowby(). Exactly one of the arguments hop or ratio should be provided.
- Parameters
- hop (
Union
[int
,float
,timedelta
]) – frequency of a window - duration (
Union
[int
,float
,timedelta
,None
]) – length of the window - ratio (
Optional
[int
]) – used as an alternative way to specify duration as hop * ratio - offset (
Union
[int
,float
,datetime
,None
]) – beginning of the first window
- hop (
- Returns
Window – object to pass as an argument to .windowby()
Examples:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
| shard | t
1 | 0 | 12
2 | 0 | 13
3 | 0 | 14
4 | 0 | 15
5 | 0 | 16
6 | 0 | 17
7 | 1 | 10
8 | 1 | 11
''')
result = t.windowby(
t.t, window=pw.temporal.sliding(duration=10, hop=3), shard=t.shard
).reduce(
pw.this._pw_shard,
pw.this._pw_window_start,
pw.this._pw_window_end,
min_t=pw.reducers.min(pw.this.t),
max_t=pw.reducers.max(pw.this.t),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
pw.temporal.tumbling(duration, offset=None)
Allows grouping together elements within a window of a given length tumbling across ordered time-like data column starting from a given offset.NOTE: Usually used as an argument of .windowby().
- Parameters
- duration (
Union
[int
,float
,timedelta
]) – length of the window - offset (
Union
[int
,float
,datetime
,None
]) – beginning of the first window
- duration (
- Returns
Window – object to pass as an argument to .windowby()
Examples:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
| shard | t
1 | 0 | 12
2 | 0 | 13
3 | 0 | 14
4 | 0 | 15
5 | 0 | 16
6 | 0 | 17
7 | 1 | 12
8 | 1 | 13
''')
result = t.windowby(
t.t, window=pw.temporal.tumbling(duration=5), shard=t.shard
).reduce(
pw.this._pw_shard,
pw.this._pw_window_start,
pw.this._pw_window_end,
min_t=pw.reducers.min(pw.this.t),
max_t=pw.reducers.max(pw.this.t),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
pw.temporal.windowby(self, time_expr, *, window, shard=None)
Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard- Parameters
- time_expr (
ColumnExpression
) – Column expression used for windowing - window (
Window
) – type window to use - shard (
Optional
[ColumnExpression
]) – optional column expression to act as a shard key
- time_expr (
Examples:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
| shard | t | v
1 | 0 | 1 | 10
2 | 0 | 2 | 1
3 | 0 | 4 | 3
4 | 0 | 8 | 2
5 | 0 | 9 | 4
6 | 0 | 10| 8
7 | 1 | 1 | 9
8 | 1 | 2 | 16
''')
result = t.windowby(
t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this.shard,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)