Table API

The Pathway programming framework is organized around work with data tables. This page contains reference for the Pathway Table class.

class pw.Table(columns, context, schema=None)

Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
isinstance(t1, pw.Table)
True

property C(: ColumnNamespace )

Returns the namespace of all the columns of a joinable. Allows accessing column names that might otherwise be a reserved methods.
import pathway as pw
tab = pw.debug.table_from_markdown('''
age | owner | pet | filter
10  | Alice | dog | True
9   | Bob   | dog | True
8   | Alice | cat | False
7   | Bob   | dog | True
''')
isinstance(tab.C.age, pw.ColumnReference)
True
pw.debug.compute_and_print(tab.filter(tab.C.filter), include_id=False)
age | owner | pet | filter
7   | Bob   | dog | True
9   | Bob   | dog | True
10  | Alice | dog | True

asof_join(other, self_time, other_time, *on, how, defaults={}, direction=Direction.BACKWARD)

Perform an ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

asof_join_left(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform a left ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

asof_join_outer(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform an outer ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 2  | 1        | 0         | 1
0         | 3  | 1        | 6         | 7
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 2         | 7
0         | 7  | 5        | 6         | 11
0         | 8  | 5        | 3         | 8
0         | 9  | 5        | 9         | 14
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
0         | 13 | 7        | 7         | 14
0         | 14 | 7        | 4         | 11
1         | 2  | -1       | 7         | 6
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16
1         | 8  | 9        | 3         | 12

asof_join_right(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform a right ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 2  | 1        | 0         | 1
0         | 3  | 1        | 6         | 7
0         | 7  | 5        | 2         | 7
0         | 8  | 5        | 3         | 8
0         | 9  | 5        | 9         | 14
0         | 13 | 7        | 7         | 14
0         | 14 | 7        | 4         | 11
1         | 2  | -1       | 7         | 6
1         | 8  | 9        | 3         | 12

asof_now_join(other, *on, how=JoinMode.INNER, id=None, left_instance=None, right_instance=None)

Performs asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.
  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT} which correspond to inner and left join respectively.
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join(
    data, pw.left.instance == pw.right.instance, how=pw.JoinMode.LEFT
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
1     |     | 2        | 1
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

asof_now_join_inner(other, *on, id=None, left_instance=None, right_instance=None)

Performs asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.
  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_inner(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

asof_now_join_left(other, *on, id=None, left_instance=None, right_instance=None)

Performs asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. If there are no matching rows in other, missing values on the right side are replaced with None. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.
  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_left(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
1     |     | 2        | 1
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

cast_to_types(**kwargs)

Casts columns to types.

concat(*others)

Concats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    Table – The concatenated table. Id’s of rows from original tables are preserved.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

concat_reindex(*tables)

Concatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    Table – The concatenated table. It will have new, synthetic ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
pet
Cat
Dog
Manul
Octopus

copy()

Returns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
7   | Bob   | dog
8   | Alice | cat
9   | Bob   | dog
10  | Alice | dog
t1 is t2
False

diff(timestamp, *values)

Compute the difference between the values in the `values` columns and the previous values according to the order defined by the column `timestamp`.
  • Parameters
    • timestamp (-) – The column reference to the timestamp column on which the order is computed.
    • *values (-) – Variable-length argument representing the column references to the values columns.
  • Returns
    Table – A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.
  • Raises
    ValueError – If the columns are not ColumnReference.

NOTE: * The value of the “first” value (the row with the lower value in the timestamp column) is None.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1         | 1
2         | 2
3         | 4
4         | 7
5         | 11
6         | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values | diff_values
1         | 1      |
2         | 2      | 1
3         | 4      | 2
4         | 7      | 3
5         | 11     | 4
6         | 16     | 5

difference(other)

Restrict self universe to keys not appearing in the other table.
  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
10  | Alice | 1

empty()

Creates an empty table with a schema specified by kwargs.
  • Parameters
    kwargs (DType) – Dict whose keys are column names and values are column types.
  • Returns
    Table – Created empty table.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
age | pet

filter(filter_expression)

Filter a table according to filter condition.
  • Parameters
    filter – ColumnExpression that specifies the filtering condition.
  • Returns
    Table – Result has the same schema as self and its ids are subset of self.id.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
label | outdegree
7     | 0

flatten(*args, **kwargs)

Performs a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet  |  age
1 | Dog  |   2
7 | Cat  |   5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
pet
C
D
a
g
o
t
t3 = t1.flatten(t1.pet, t1.age)
pw.debug.compute_and_print(t3, include_id=False)
pet | age
C   | 5
D   | 2
a   | 5
g   | 2
o   | 2
t   | 5

from_columns(**kwargs)

Build a table from columns.

All columns must have the same ids. Columns’ names must be pairwise distinct.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float).with_universe_of(t1)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)
pet | qux

groupby(*args, id=None, sort_by=None, _filter_out_results_of_forgetting=False, instance=None)

Groups table by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (Optional[ColumnReference]) – if provided, is the column used to set id’s of the rows of the result
    • instance (Optional[ColumnReference]) – optional argument describing partitioning of the data into separate instances
  • Returns
    GroupedTable – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob   | dog | 16

having(*indexers)

Removes rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexers

property id(: ColumnReference )

Get reference to pseudocolumn containing id’s of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.select(ids = t1.id)
t2.typehints()['ids']
<class 'pathway.engine.Pointer'>
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
test
True
True
True
True

interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)

Interpolates missing values in a column using the previous and next values based on a timestamps column.
  • Parameters
    • timestamp (ColumnReference) – Reference to the column containing timestamps.
    • *values (ColumnReference) – References to the columns containing values to be interpolated.
    • mode (InterpolateMode*, *optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
  • Returns
    Table – A new table with the interpolated values.
  • Raises
    ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.

NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.

  • If a value is missing at the beginning or end of the column, no interpolation is performed.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1         | 1        | 10
2         |          |
3         | 3        |
4         |          |
5         |          |
6         | 6        | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values_a | values_b
1         | 1        | 10
2         | 2.0      | 20.0
3         | 3        | 30.0
4         | 4.0      | 40.0
5         | 5.0      | 50.0
6         | 6        | 60

intersect(*tables)

Restrict self universe to keys appearing in all of the tables.
  • Parameters
    tables (Table) – tables keys of which are used to restrict universe.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1

interval_join(other, self_time, other_time, interval, *on, behavior=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.
  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (Optional[CommonBehavior]) – defines a temporal behavior of a join - features like delaying entries or ignoring late entries. You can see examples below or read more in the temporal behavior of interval join tutorial .
    • how (JoinMode) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until their time is greater or equal to the maximal already seen time minus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

interval_join_inner(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.
  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (Optional[CommonBehavior]) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until their time is greater or equal to the maximal already seen time minus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

interval_join_left(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

Performs an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (Optional[CommonBehavior]) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until their time is greater or equal to the maximal already seen time minus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          |             | 2        | 2         |            | 30       | -1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

interval_join_outer(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

Performs an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (Optional[CommonBehavior]) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until their time is greater or equal to the maximal already seen time minus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_outer(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          |             | 2        | 2         |            | 30       | -1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_outer(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

interval_join_right(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

Performs an interval right join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (Optional[CommonBehavior]) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_right(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until their time is greater or equal to the maximal already seen time minus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

ix(expression, *, optional=False, context=None)

Reindexes the table using expression values as keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError.

Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).

  • Returns
    Reindexed table with the same set of columns.

Example:

import pathway as pw
t_animals = pw.debug.table_from_markdown('''
  | epithet    | genus
1 | upupa      | epops
2 | acherontia | atropos
3 | bubo       | scandiacus
4 | dynastes   | hercules
''')
t_birds = pw.debug.table_from_markdown('''
  | desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)
desc   | latin
hoopoe | atropos
owl    | hercules

ix_ref(*args, optional=False, context=None, instance=None)

Reindexes the table using expressions as primary keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError.

Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).

  • Parameters
    args (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – Column references.
  • Returns
    Row – indexed row.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)
name   | pet | new_value
Alice  | dog | dog
Bob    | cat | dog
Carole | cat | dog
David  | dog | dog

Tables obtained by a groupby/reduce scheme always have primary keys:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(t1.pet).count)
pw.debug.compute_and_print(t3, include_id=False)
name   | pet | new_value
Alice  | dog | 1
Bob    | cat | 3
Carole | cat | 3
David  | cat | 3

Single-row tables can be accessed via ix_ref():

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.reduce(count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(context=t1).count)
pw.debug.compute_and_print(t3, include_id=False)
name   | pet | new_value
Alice  | dog | 4
Bob    | cat | 4
Carole | cat | 4
David  | cat | 4

join(other, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

Join self with other using the given join expression.
  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_inner(other, *on, id=None, left_instance=None, right_instance=None)

Inner-joins two tables or join results.
  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size)  # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_left(other, *on, id=None, left_instance=None, right_instance=None)

Left-joins two tables or join results.
  • Parameters
    • other (Joinable) – Table or join result.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (Optional[ColumnReference]) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)
a  | t2_c | s
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_outer(other, *on, id=None, left_instance=None, right_instance=None)

Outer-joins two tables or join results.
  • Parameters
    • other (Joinable) – Table or join result.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (Optional[ColumnReference]) – optional id column of the result
    • instance – optional argument describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_right(other, *on, id=None, left_instance=None, right_instance=None)

Outer-joins two tables or join results.
  • Parameters
    • other (Joinable) – Table or join result.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (Optional[ColumnReference]) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
  • Returns
    OuterJoinResult object

plot(plotting_function, sorting_col=None)

Allows for plotting contents of the table visually in e.g. jupyter. If the table depends only on the bounded data sources, the plot will be generated right away. Otherwise (in streaming scenario), the plot will be auto-updating after running pw.run()
  • Parameters
    • self (pw.Table) – a table serving as a source of data
    • plotting_function (Callable*[[ColumnDataSource], Plot]*) – function for creating plot from ColumnDataSource
  • Returns
    pn.Column – visualization which can be displayed immediately or passed as a dashboard widget

Example:

import pathway as pw
from bokeh.plotting import figure
def func(source):
    plot = figure(height=400, width=400, title="CPU usage over time")
    plot.scatter('a', 'b', source=source, line_width=3, line_alpha=0.6)
    return plot
viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).plot(func)
type(viz)
<class 'panel.layout.base.Column'>

pointer_from(*args, optional=False, instance=None)

Pseudo-random hash of its argument. Produces pointer types. Applied column-wise.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
test
True
True

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    None

NOTE: The assertion works in place.

Example:

import pathway as pw
import pytest
t1 = pw.debug.table_from_markdown(
    '''
  | age | owner | pet
1 | 8   | Alice | cat
2 | 9   | Bob   | dog
3 | 15  | Alice | tortoise
4 | 99  | Bob   | seahorse
'''
).filter(pw.this.age<30)
t2 = pw.debug.table_from_markdown(
    '''
  | age | owner
1 | 11  | Alice
2 | 12  | Tom
3 | 7   | Eve
'''
)
t3 = t2.filter(pw.this.age > 10)
with pytest.raises(
    ValueError,
    match='Universe of the argument of Table.update_cells\(\) needs ' # noqa
    + 'to be a subset of the universe of the updated table.',
):
    t1.update_cells(t3)

t1 = t1.promise_universe_is_equal_to(t2)
result = t1.update_cells(t3)
pw.debug.compute_and_print(result, include_id=False)
age | owner | pet
11  | Alice | cat
12  | Tom   | dog
15  | Alice | tortoise

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

reduce(*args, **kwargs)

Reduce a table to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False) 
ageagg
^...
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)
age | pet
7   | dog

rename(names_mapping=None, **kwargs)

Rename columns according either a dictionary or kwargs.

If a mapping is provided using a dictionary, rename_by_dict will be used. Otherwise, rename_columns will be used with kwargs. Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    • names_mapping (Optional[dict[str | ColumnReference, str]]) – mapping from old column names to new names.
    • kwargs (ColumnExpression) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

rename_by_dict(names_mapping)

Rename columns according to a dictionary.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    names_mapping (dict[str | ColumnReference, str]) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_by_dict({"age": "years_old", t1.pet: "animal"})
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8         | 2
Alice | 10        | 1
Bob   | 9         | 1

rename_columns(**kwargs)

Rename columns according to kwargs.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    kwargs (str | ColumnReference) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8         | 2
Alice | 10        | 1
Bob   | 9         | 1

restrict(other)

Restrict self universe to keys appearing in other.
  • Parameters
    other (TableLike) – table which universe is used to restrict universe of self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
'''
)
t2 = pw.debug.table_from_markdown(
    '''
  | cost
2 | 100
3 | 200
'''
)
t2.promise_universe_is_subset_of(t1)
<pathway.Table schema={'cost': <class 'int'>}>
t3 = t1.restrict(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1

property schema(: type[pathway.internals.schema.Schema] )

Get schema of the table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.schema
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}>
t1.typehints()['age']
<class 'int'>

select(*args, **kwargs)

Build a new table with columns specified by kwargs.

Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)
animal | desc
Cat    | fluffy
Dog    | fluffy

show(*, snapshot=True, include_id=True, short_pointers=True, sorters=None)

Allows for displaying table visually in e.g. jupyter. If the table depends only on the bounded data sources, the table preview will be generated right away. Otherwise (in streaming scenario), the table will be auto-updating after running pw.run()
  • Parameters
    • self (pw.Table) – a table to be displayed
    • snapshot (bool*, *optional) – whether only current snapshot or all changes to the table should be displayed. Defaults to True.
    • include_id (bool*, *optional) – whether to show ids of rows. Defaults to True.
    • short_pointers (bool*, *optional) – whether to shorten printed ids. Defaults to True.
  • Returns
    pn.Column – visualization which can be displayed immediately or passed as a dashboard widget

Example:

import pathway as pw
table_viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).show()
type(table_viz)
<class 'panel.layout.base.Column'>

property slice(: TableSlice )

Creates a collection of references to self columns. Supports basic column manipulation methods.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.slice.without("age")
TableSlice({'owner': <table1>.owner, 'pet': <table1>.pet})

sort(key, instance=None)

Sorts a table by the specified keys.
  • Parameters
    • table – pw.Table The table to be sorted.
    • key (ColumnExpression) – ColumnReference An expression to sort by.
    • instance (Optional[ColumnExpression]) – ColumnReference or None An expression with instance. Rows are sorted within an instance. prev and next columns will only point to rows that have the same instance.
  • Returns
    pw.Table – The sorted table. Contains two columns: prev and next, containing the pointers to the previous and next rows.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age)
pw.debug.compute_and_print(table, include_id=True)
            | name    | age | score | prev        | next
^GBSDEEW... | Alice   | 25  | 80    | ^EDPSSB1... | ^DS9AT95...
^EDPSSB1... | Bob     | 20  | 90    |             | ^GBSDEEW...
^DS9AT95... | Charlie | 30  | 80    | ^GBSDEEW... |
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
David    | 35  | 90
Eve      | 15  | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age, instance=pw.this.score)
pw.debug.compute_and_print(table, include_id=True)
            | name    | age | score | prev        | next
^GBSDEEW... | Alice   | 25  | 80    | ^T0B95XH... | ^DS9AT95...
^EDPSSB1... | Bob     | 20  | 90    |             | ^RT0AZWX...
^DS9AT95... | Charlie | 30  | 80    | ^GBSDEEW... |
^RT0AZWX... | David   | 35  | 90    | ^EDPSSB1... |
^T0B95XH... | Eve     | 15  | 80    |             | ^GBSDEEW...

typehints()

Return the types of the columns as a dictionary.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.typehints()
mappingproxy({'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>})

update_cells(other)

Updates cells of self, breaking ties in favor of the values in other.

Semantics:

* result.columns == self.columns

* result.id == self.id

* conflicts are resolved preferring other’s values

Requires:

* other.columns ⊆ self.columns

* other.id ⊆ self.id
  • Parameters
    other (Table) – the other table.
  • Returns
    Table – self updated with cells form other.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
    age | owner | pet
1 | 10  | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

update_rows(other)

Updates rows of self, breaking ties in favor for the rows in other.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

Requires:

  • other.columns == self.columns
  • Parameters
    other (Table[TypeVar(TSchema, bound= Schema)]) – the other table.
  • Returns
    Table – self updated with rows form other.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
1  | 10  | Alice | 30
12 | 12  | Tom   | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30
12  | Tom   | 40

update_types(**kwargs)

Updates types in schema. Has no effect on the runtime.

window_join(other, self_time, other_time, window, *on, how=JoinMode.INNER, left_instance=None, right_instance=None)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run window_join_inner, window_join_left, window_join_right or window_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

window_join_inner(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

window_join_left(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

Performs a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |

window_join_outer(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

Performs a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate