Table API

The Pathway programming framework is organized around work with data tables. This page contains reference for the Pathway Table class.

class pw.Table(columns, universe, pk_columns={}, schema=None, id_column=None)

Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
isinstance(t1, pw.Table)
True

asof_join(other, self_time, other_time, *on, how, defaults={}, direction=Direction.BACKWARD)

Perform an ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

asof_join_left(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform a left ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 6         | 11
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16

asof_join_outer(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform an outer ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 1  | 1        | -1        | 0
0         | 2  | 1        | 0         | 1
0         | 3  | 1        | 6         | 7
0         | 4  | 2        | 6         | 8
0         | 5  | 3        | 6         | 9
0         | 6  | 4        | 6         | 10
0         | 7  | 5        | 2         | 7
0         | 7  | 5        | 6         | 11
0         | 8  | 5        | 3         | 8
0         | 9  | 5        | 9         | 14
0         | 11 | 6        | 9         | 15
0         | 12 | 7        | 9         | 16
0         | 13 | 7        | 7         | 14
0         | 14 | 7        | 4         | 11
1         | 2  | -1       | 7         | 6
1         | 5  | 8        | 7         | 15
1         | 7  | 9        | 7         | 16
1         | 8  | 9        | 3         | 12

asof_join_right(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)

Perform a right ASOF join of two tables.
  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • defaults (Dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.shard_key,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t  | val_left | val_right | sum
0         | 2  | 1        | 0         | 1
0         | 3  | 1        | 6         | 7
0         | 7  | 5        | 2         | 7
0         | 8  | 5        | 3         | 8
0         | 9  | 5        | 9         | 14
0         | 13 | 7        | 7         | 14
0         | 14 | 7        | 4         | 11
1         | 2  | -1       | 7         | 6
1         | 8  | 9        | 3         | 12

cast_to_types(**kwargs)

Casts columns to types.

concat(*others)

Concats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    Table – The concatenated table. Id’s of rows from original tables are preserved.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

concat_reindex(*tables)

Concatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    Table – The concatenated table. It will have new, synthetic ids.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
  | pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
pet
Cat
Dog
Manul
Octopus

copy()

Returns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
7   | Bob   | dog
8   | Alice | cat
9   | Bob   | dog
10  | Alice | dog
t1 is t2
False

diff(timestamp, *values)

Compute the difference between the values in the `values` columns and the previous values according to the order defined by the column `timestamp`.
  • Parameters
    • timestamp (-) – The column reference to the timestamp column on which the order is computed.
    • *values (-) – Variable-length argument representing the column references to the values columns.
  • Returns
    Table – A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.
  • Raises
    ValueError – If the columns are not ColumnReference.

NOTE: * The value of the “first” value (the row with the lower value

    in the `timestamp` column) is `None`.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1         | 1
2         | 2
3         | 4
4         | 7
5         | 11
6         | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values | diff_values
1         | 1      |
2         | 2      | 1
3         | 4      | 2
4         | 7      | 3
5         | 11     | 4
6         | 16     | 5

difference(other)

Restrict self universe to keys not appearing in the other table.
  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.parse_to_table('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
10  | Alice | 1

dtypes()

Return the types of the columns as a dictionary.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.dtypes()
{'age': INT, 'owner': STR, 'pet': STR}
t1.schema['age']
INT

empty()

Creates an empty table with a schema specified by kwargs.
  • Parameters
    kwargs (DType) – Dict whose keys are column names and values are column types.
  • Returns
    Table – Created empty table.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
age | pet

filter(filter_expression)

Filter a table according to filter condition.
  • Parameters
    filter – ColumnExpression that specifies the filtering condition.
  • Returns
    Table – Result has the same schema as self and its ids are subset of self.id.

Example:

import pathway as pw
vertices = pw.debug.parse_to_table('''
label outdegree
    1         3
    7         0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
label | outdegree
7     | 0

flatten(*args, **kwargs)

Performs a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | pet  |  age
1 | Dog  |   2
7 | Cat  |   5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
pet
C
D
a
g
o
t
t3 = t1.flatten(t1.pet, t1.age)
pw.debug.compute_and_print(t3, include_id=False)
pet | age
C   | 5
D   | 2
a   | 5
g   | 2
o   | 2
t   | 5

from_columns(**kwargs)

Build a table from columns.

All columns must have the same ids. Columns’ names must be pairwise distinct.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)
pet | qux

groupby(*args, id=None)

Groups table by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (Optional[ColumnReference]) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    GroupedTable – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob   | dog | 16

having(*indexers)

Removes rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexers

property id(: ColumnReference )

Get reference to pseudocolumn containing id’s of a table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.select(ids = t1.id)
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
test
True
True
True
True

interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)

Interpolates missing values in a column using the previous and next values based on a timestamps column.
  • Parameters
    • timestamp (ColumnReference) – Reference to the column containing timestamps.
    • *values (ColumnReference) – References to the columns containing values to be interpolated.
    • mode (InterpolateMode*, *optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
  • Returns
    Table – A new table with the interpolated values.
  • Raises
    ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.

NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.

  • If a value is missing at the beginning or end of the column, no interpolation is performed.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1         | 1        | 10
2         |          |
3         | 3        |
4         |          |
5         |          |
6         | 6        | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values_a | values_b
1         | 1        | 10
2         | 2.0      | 20.0
3         | 3        | 30.0
4         | 4.0      | 40.0
5         | 5.0      | 50.0
6         | 6        | 60

intersect(*tables)

Restrict self universe to keys appearing in all of the tables.
  • Parameters
    tables (Table) – tables keys of which are used to restrict universe.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.parse_to_table('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1

interval_join(other, self_time, other_time, interval, *on, how=JoinMode.INNER)

Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.
  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

interval_join_inner(other, self_time, other_time, interval, *on)

Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.
  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

interval_join_left(other, self_time, other_time, interval, *on)

Performs an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

interval_join_outer(other, self_time, other_time, interval, *on)

Performs an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

interval_join_right(other, self_time, other_time, interval, *on)

Performs an interval right join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.
  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_right(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

property ix()

Return an object that indexed by \[column\] returns new table reindexed.
  • Returns
    Indexer – an object that when indexed by some column returns a table with rows specified by that column.

Example:

import pathway as pw
t_animals = pw.debug.parse_to_table('''
  | epithet    | genus
1 | upupa      | epops
2 | acherontia | atropos
3 | bubo       | scandiacus
4 | dynastes   | hercules
''')
t_birds = pw.debug.parse_to_table('''
  | desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)
desc   | latin
hoopoe | atropos
owl    | hercules

ix_ref(*args, optional=False)

Returns a row, indexed by its primary keys. Several columns can be used as index.
  • Parameters
    args (Union[ColumnExpression, None, int, float, str, bytes, bool, BasePointer, datetime, timedelta, ndarray, Tuple[Union[None, int, float, str, bytes, bool, BasePointer, datetime, timedelta, ndarray, Tuple[Value, ...]], ...]]) – Column references.
  • Returns
    Row – indexed row.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)
name   | pet | new_value
Alice  | dog | dog
Bob    | cat | dog
Carole | cat | dog
David  | dog | dog

Tables obtained by a groupby/reduce scheme always have primary keys:

import pathway as pw
t1 = pw.debug.parse_to_table('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(pw.this.pet).count)
pw.debug.compute_and_print(t3, include_id=False)
name   | pet | new_value
Alice  | dog | 1
Bob    | cat | 3
Carole | cat | 3
David  | cat | 3

Single-row tables can be accessed via ix_ref():

import pathway as pw
t1 = pw.debug.parse_to_table('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.reduce(count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref().count)
pw.debug.compute_and_print(t3, include_id=False)
name   | pet | new_value
Alice  | dog | 4
Bob    | cat | 4
Carole | cat | 4
David  | cat | 4

join(other, *on, id=None, how=JoinMode.INNER)

Join self with other using the given join expression.
  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.parse_to_table('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_inner(other, *on, id=None)

Inner-joins two tables or join results.
  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (Optional[ColumnReference]) – optional argument for id of result, can be only self.id or other.id
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.parse_to_table('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size)  # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_left(other, *on, id=None)

Left-joins two tables or join results.

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | A  | B
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | C  | D
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)),
include_id=False)
A  | t2_C | S
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_outer(other, *on, id=None)

Outer-joins two tables or join results.

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | A  | B
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | C  | D
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)),
include_id=False)
A  | t2_C | S
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_right(other, *on, id=None)

Outer-joins two tables or join results.

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | A  | B
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | C  | D
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)),
include_id=False)
A  | t2_C | S
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
  • Returns
    OuterJoinResult object

pointer_from(*args, optional=False)

Pseudo-random hash of its argument. Produces pointer types. Applied column-wise.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
test
True
True

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    None

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
  | age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

reduce(*args, **kwargs)

Reduce a table to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False) 
ageagg
^...
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)
age | pet
7   | dog

rename(names_mapping=None, **kwargs)

Rename columns according either a dictionary or kwargs.

If a mapping is provided using a dictionary, rename_by_dict will be used. Otherwise, rename_columns will be used with kwargs. Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    • names_mapping (Optional[Dict[Union[str, ColumnReference], str]]) – mapping from old column names to new names.
    • kwargs (ColumnExpression) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

rename_by_dict(names_mapping)

Rename columns according to a dictionary.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    names_mapping (Dict[Union[str, ColumnReference], str]) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_by_dict({"age": "years_old", t1.pet: "animal"})
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8         | 2
Alice | 10        | 1
Bob   | 9         | 1

rename_columns(**kwargs)

Rename columns according to kwargs.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    kwargs (Union[str, ColumnReference]) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8         | 2
Alice | 10        | 1
Bob   | 9         | 1

property schema(: Type[Schema] )

Get schema of the table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.schema.as_dict()
{'age': INT, 'owner': STR, 'pet': STR}
t1.schema['age']
INT

select(*args, **kwargs)

Build a new table with columns specified by kwargs.

Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)
animal | desc
Cat    | fluffy
Dog    | fluffy

property slice(: TableSlice )

Creates a collection of references to self columns. Supports basic column manipulation methods.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.slice.without("age")
TableSlice({'owner': <table1>.owner, 'pet': <table1>.pet})

sort(key=None, instance=None)

Sorts a table by the specified keys.
  • Parameters
    • table (Table) – pw.Table The table to be sorted.
    • key (Optional[ColumnReference]) – ColumnReference or None The name of the primary key to sort by. If None, the table is sorted based on the key column as primary key.
    • instance (Optional[ColumnReference]) – ColumnReference or None The name of the secondary key to sort by. If None, the field “instance” is chosen if it exists, otherwise only the primary key is used.
  • Returns
    pw.Table – The sorted table. Contains two columns: prev and next, containing the pointers to the previous and next rows.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
''')
table = table.with_id_from(pw.this.name)
table += sort(table, key=pw.this.age)
pw.debug.compute_and_print(table, include_id=False)
name    | age | score | next        | prev
Alice   | 25  | 80    | ^DS9AT95... | ^EDPSSB1...
Bob     | 20  | 90    | ^GBSDEEW... |
Charlie | 30  | 80    |             | ^GBSDEEW...

update_cells(other)

Updates cells of self, breaking ties in favor of the values in other.

Semantics:

* result.columns == self.columns

* result.id == self.id

* conflicts are resolved preferring other’s values

Requires:

* other.columns ⊆ self.columns

* other.id ⊆ self.id
  • Parameters
    other (Table) – the other table.
  • Returns
    Table – self updated with cells form other.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
    age | owner | pet
1 | 10  | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

update_rows(other)

Updates rows of self, breaking ties in favor for the rows in other.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

Requires:

  • other.columns == self.columns
  • Parameters
    other (Table) – the other table.
  • Returns
    Table – self updated with rows form other.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
   | age | owner | pet
1  | 10  | Alice | 30
12 | 12  | Tom   | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30
12  | Tom   | 40

update_types(**kwargs)

Updates types in schema. Has no effect on the runtime.

window_join(other, self_time, other_time, window, *on, how=JoinMode.INNER)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run window_join_inner, window_join_left, window_join_right or window_join_outer. Default is INNER.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

window_join_inner(other, self_time, other_time, window, *on)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

window_join_left(other, self_time, other_time, window, *on)

Performs a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |

window_join_outer(other, self_time, other_time, window, *on)

Performs a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | -3
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   |        | 10
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |
4   |        | 3

window_join_right(other, self_time, other_time, window, *on)

Performs a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t2.b, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | -3
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t2.b, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   |        | 10
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
4   |        | 3

windowby(time_expr, *, window, shard=None)

Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard
  • Parameters
    • time_expr (ColumnExpression) – Column expression used for windowing
    • window (Window) – type window to use
    • shard (Optional[ColumnExpression]) – optional column expression to act as a shard key

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | shard |  t |  v
1   | 0     |  1 |  10
2   | 0     |  2 |  1
3   | 0     |  4 |  3
4   | 0     |  8 |  2
5   | 0     |  9 |  4
6   | 0     |  10|  8
7   | 1     |  1 |  9
8   | 1     |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this.shard,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
shard | min_t | max_v | count
0     | 1     | 10    | 2
0     | 4     | 3     | 1
0     | 8     | 8     | 3
1     | 1     | 16    | 2

with_columns(*args, **kwargs)

Updates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
  | owner | pet | size
1 | Tom   | 1   | 10
2 | Bob   | 1   | 9
3 | Tom   | 2   | 8
''').with_universe_of(t1)
t3 = t1.with_columns(*t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet | size
8   | Tom   | 2   | 8
9   | Bob   | 1   | 9
10  | Tom   | 1   | 10

with_id(new_index)

Set new ids based on another column containing id-typed values.

To generate ids based on arbitrary valued columns, use with_id_from.

Values assigned must be row-wise unique.

  • Parameters
    new_id – column to be used as the new index.
  • Returns
    Table with updated ids.

Example:

import pytest; pytest.xfail("with_id is hard to test")
import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
  | new_id
1 | 2
2 | 3
3 | 4
''')
t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id)
pw.debug.compute_and_print(t3)
    age  owner  pet
^2   10  Alice    1
^3    9    Bob    1
^4    8  Alice    2

with_id_from(*args)

Compute new ids based on values in columns. Ids computed from columns must be row-wise unique.
  • Parameters
    columns – columns to be used as primary keys.
  • Returns
    Table – self updated with recomputed ids.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   | age | owner  | pet
 1 | 10  | Alice  | 1
 2 | 9   | Bob    | 1
 3 | 8   | Alice  | 2
''')
t2 = t1 + t1.select(old_id=t1.id)
t3 = t2.with_id_from(t2.age)
pw.debug.compute_and_print(t3) 
     | age | owner | pet | old_id
^... | 8   | Alice | 2   | ^...
^... | 9   | Bob   | 1   | ^...
^... | 10  | Alice | 1   | ^...
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id),
    same_as_new=(t3.id == t3.pointer_from(t3.age)))
pw.debug.compute_and_print(t4) 
     | age | owner | pet | same_as_old | same_as_new
^... | 8   | Alice | 2   | False       | True
^... | 9   | Bob   | 1   | False       | True
^... | 10  | Alice | 1   | False       | True

with_prefix(prefix)

Rename columns by adding prefix to each name of column.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.with_prefix("u_")
pw.debug.compute_and_print(t2, include_id=False)
u_age | u_owner | u_pet
8     | Alice   | 2
9     | Bob     | 1
10    | Alice   | 1

with_suffix(suffix)

Rename columns by adding suffix to each name of column.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.with_suffix("_current")
pw.debug.compute_and_print(t2, include_id=False)
age_current | owner_current | pet_current
8           | Alice         | 2
9           | Bob           | 1
10          | Alice         | 1

with_universe_of(other)

Returns a copy of self with exactly the same universe as others.

Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
  | age
1 | 10
7 | 3
''').with_universe_of(t1)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10

without(*columns)

Selects all columns without named column references.
  • Parameters
    columns (Union[str, ColumnReference]) – columns to be dropped provided by table.column_name notation.
  • Returns
    Table – self without specified columns.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = t1.without(t1.age, pw.this.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner
Alice
Alice
Bob

class pw.TableLike(universe)

Interface class for table-likes: Table, GroupedTable and JoinResult. All of those contain universe info, and thus support universe-related asserts.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
g1 = t1.groupby(t1.owner)
t2 = t1.filter(t1.age >= 9)
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
9   | Bob   | dog
10  | Alice | dog
g2 = t2.groupby(t2.owner)
pw.universes.promise_is_subset_of(g2, g1) # t2 is a subset of t1, so this is safe

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    None

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
  | age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40