Table API
The Pathway programming framework is organized around work with data tables. This page contains reference for the Pathway Table class.
class pw.Table(columns, universe, pk_columns={}, schema=None, id_column=None)
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
isinstance(t1, pw.Table)
asof_join(other, self_time, other_time, *on, how, defaults={}, direction=Direction.BACKWARD)
Perform an ASOF join of two tables.- Parameters
- other (
Table
) – Table to join with self, both must contain a column val - self_time (
ColumnExpression
) – time-like column expression to do the join against - other_time (
ColumnExpression
) – time-like column expression to do the join against - on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - how (
JoinMode
) – mode of the join (LEFT, RIGHT, FULL) - defaults (
Dict
[ColumnReference
,Any
]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
- other (
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| K | val | t
1 | 0 | 1 | 1
2 | 0 | 2 | 4
3 | 0 | 3 | 5
4 | 0 | 4 | 6
5 | 0 | 5 | 7
6 | 0 | 6 | 11
7 | 0 | 7 | 12
8 | 1 | 8 | 5
9 | 1 | 9 | 7
'''
)
t2 = pw.debug.table_from_markdown(
'''
| K | val | t
21 | 1 | 7 | 2
22 | 1 | 3 | 8
23 | 0 | 0 | 2
24 | 0 | 6 | 3
25 | 0 | 2 | 7
26 | 0 | 3 | 8
27 | 0 | 9 | 9
28 | 0 | 7 | 13
29 | 0 | 4 | 14
'''
)
res = t1.asof_join(
t2,
t1.t,
t2.t,
t1.K == t2.K,
how=pw.JoinMode.LEFT,
defaults={t2.val: -1},
).select(
pw.this.shard_key,
pw.this.t,
val_left=t1.val,
val_right=t2.val,
sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
asof_join_left(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)
Perform a left ASOF join of two tables.- Parameters
- other (
Table
) – Table to join with self, both must contain a column val - self_time (
ColumnExpression
) – time-like column expression to do the join against - other_time (
ColumnExpression
) – time-like column expression to do the join against - on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - defaults (
Dict
[ColumnReference
,Any
]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used. - direction (
Direction
) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
- other (
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| K | val | t
1 | 0 | 1 | 1
2 | 0 | 2 | 4
3 | 0 | 3 | 5
4 | 0 | 4 | 6
5 | 0 | 5 | 7
6 | 0 | 6 | 11
7 | 0 | 7 | 12
8 | 1 | 8 | 5
9 | 1 | 9 | 7
'''
)
t2 = pw.debug.table_from_markdown(
'''
| K | val | t
21 | 1 | 7 | 2
22 | 1 | 3 | 8
23 | 0 | 0 | 2
24 | 0 | 6 | 3
25 | 0 | 2 | 7
26 | 0 | 3 | 8
27 | 0 | 9 | 9
28 | 0 | 7 | 13
29 | 0 | 4 | 14
'''
)
res = t1.asof_join_left(
t2,
t1.t,
t2.t,
t1.K == t2.K,
defaults={t2.val: -1},
).select(
pw.this.shard_key,
pw.this.t,
val_left=t1.val,
val_right=t2.val,
sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
asof_join_outer(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)
Perform an outer ASOF join of two tables.- Parameters
- other (
Table
) – Table to join with self, both must contain a column val - self_time (
ColumnExpression
) – time-like column expression to do the join against - other_time (
ColumnExpression
) – time-like column expression to do the join against - on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - defaults (
Dict
[ColumnReference
,Any
]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used. - direction (
Direction
) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
- other (
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| K | val | t
1 | 0 | 1 | 1
2 | 0 | 2 | 4
3 | 0 | 3 | 5
4 | 0 | 4 | 6
5 | 0 | 5 | 7
6 | 0 | 6 | 11
7 | 0 | 7 | 12
8 | 1 | 8 | 5
9 | 1 | 9 | 7
'''
)
t2 = pw.debug.table_from_markdown(
'''
| K | val | t
21 | 1 | 7 | 2
22 | 1 | 3 | 8
23 | 0 | 0 | 2
24 | 0 | 6 | 3
25 | 0 | 2 | 7
26 | 0 | 3 | 8
27 | 0 | 9 | 9
28 | 0 | 7 | 13
29 | 0 | 4 | 14
'''
)
res = t1.asof_join_outer(
t2,
t1.t,
t2.t,
t1.K == t2.K,
defaults={t1.val: -1, t2.val: -1},
).select(
pw.this.shard_key,
pw.this.t,
val_left=t1.val,
val_right=t2.val,
sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
asof_join_right(other, self_time, other_time, *on, defaults={}, direction=Direction.BACKWARD)
Perform a right ASOF join of two tables.- Parameters
- other (
Table
) – Table to join with self, both must contain a column val - self_time (
ColumnExpression
) – time-like column expression to do the join against - other_time (
ColumnExpression
) – time-like column expression to do the join against - on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - defaults (
Dict
[ColumnReference
,Any
]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used. - direction (
Direction
) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
- other (
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| K | val | t
1 | 0 | 1 | 1
2 | 0 | 2 | 4
3 | 0 | 3 | 5
4 | 0 | 4 | 6
5 | 0 | 5 | 7
6 | 0 | 6 | 11
7 | 0 | 7 | 12
8 | 1 | 8 | 5
9 | 1 | 9 | 7
'''
)
t2 = pw.debug.table_from_markdown(
'''
| K | val | t
21 | 1 | 7 | 2
22 | 1 | 3 | 8
23 | 0 | 0 | 2
24 | 0 | 6 | 3
25 | 0 | 2 | 7
26 | 0 | 3 | 8
27 | 0 | 9 | 9
28 | 0 | 7 | 13
29 | 0 | 4 | 14
'''
)
res = t1.asof_join_right(
t2,
t1.t,
t2.t,
t1.K == t2.K,
defaults={t1.val: -1},
).select(
pw.this.shard_key,
pw.this.t,
val_left=t1.val,
val_right=t2.val,
sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
cast_to_types(**kwargs)
Casts columns to types.concat(*others)
Concats self with every other ∊ others.Semantics:
- result.columns == self.columns == other.columns
- result.id == self.id ∪ other.id
if self.id and other.id collide, throws an exception.
Requires:
- other.columns == self.columns
- self.id disjoint with other.id
- Parameters
other – the other table. - Returns
Table – The concatenated table. Id’s of rows from original tables are preserved.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
concat_reindex(*tables)
Concatenate contents of several tables.This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.
- Parameters
tables (Table
) – List of tables to concatenate. All tables must have the same schema. - Returns
Table – The concatenated table. It will have new, synthetic ids.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
copy()
Returns a copy of a table.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
t1 is t2
diff(timestamp, *values)
Compute the difference between the values in the `values` columns and the previous values according to the order defined by the column `timestamp`.- Parameters
- timestamp (-) – The column reference to the
timestamp
column on which the order is computed. - *values (-) – Variable-length argument representing the column
references to the
values
columns.
- timestamp (-) – The column reference to the
- Returns
Table
– A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name. - Raises
ValueError – If the columns are not ColumnReference.
NOTE: * The value of the “first” value (the row with the lower value
in the `timestamp` column) is `None`.
Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1 | 1
2 | 2
3 | 4
4 | 7
5 | 11
6 | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
difference(other)
Restrict self universe to keys not appearing in the other table.- Parameters
other (Table
) – table with ids to remove from self. - Returns
Table – table with restricted universe, with the same set of columns
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
dtypes()
Return the types of the columns as a dictionary.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.dtypes()
t1.schema['age']
empty()
Creates an empty table with a schema specified by kwargs.- Parameters
kwargs (DType
) – Dict whose keys are column names and values are column types. - Returns
Table – Created empty table.
Example:
import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
filter(filter_expression)
Filter a table according to filter condition.- Parameters
filter – ColumnExpression that specifies the filtering condition. - Returns
Table – Result has the same schema as self and its ids are subset of self.id.
Example:
import pathway as pw
vertices = pw.debug.parse_to_table('''
label outdegree
1 3
7 0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
flatten(*args, **kwargs)
Performs a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet | age
1 | Dog | 2
7 | Cat | 5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
t3 = t1.flatten(t1.pet, t1.age)
pw.debug.compute_and_print(t3, include_id=False)
from_columns(**kwargs)
Build a table from columns.All columns must have the same ids. Columns’ names must be pairwise distinct.
- Parameters
- args (
ColumnReference
) – List of columns. - kwargs (
ColumnReference
) – Columns with their new names.
- args (
- Returns
Table – Created table.
Example:
import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)
groupby(*args, id=None)
Groups table by columns from args.NOTE: Usually followed by .reduce() that aggregates the result and returns a table.
- Parameters
- args (
ColumnReference
) – columns to group by. - id (
Optional
[ColumnReference
]) – if provided, is the column used to set id’s of the rows of the result
- args (
- Returns
GroupedTable – Groupby object.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
having(*indexers)
Removes rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexersproperty id(: ColumnReference )
Get reference to pseudocolumn containing id’s of a table.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.select(ids = t1.id)
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)
Interpolates missing values in a column using the previous and next values based on a timestamps column.- Parameters
- timestamp (ColumnReference) – Reference to the column containing timestamps.
- *values (ColumnReference) – References to the columns containing values to be interpolated.
- mode (InterpolateMode*, *optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
- Returns
Table – A new table with the interpolated values. - Raises
ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.
NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.
- If a value is missing at the beginning or end of the column, no interpolation is performed.
Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1 | 1 | 10
2 | |
3 | 3 |
4 | |
5 | |
6 | 6 | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)
intersect(*tables)
Restrict self universe to keys appearing in all of the tables.- Parameters
tables (Table
) – tables keys of which are used to restrict universe. - Returns
Table – table with restricted universe, with the same set of columns
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)
interval_join(other, self_time, other_time, interval, *on, how=JoinMode.INNER)
Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - lower_bound – a lower bound on time difference between other_time and self_time.
- upper_bound – an upper bound on time difference between other_time and self_time.
- on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - how (
JoinMode
) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
- other (
- Returns
IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join(
t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
interval_join_inner(other, self_time, other_time, interval, *on)
Performs an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - lower_bound – a lower bound on time difference between other_time and self_time.
- upper_bound – an upper bound on time difference between other_time and self_time.
- on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
interval_join_left(other, self_time, other_time, interval, *on)
Performs an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.- Parameters
- other (
Table
) – the right side of the join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - lower_bound – a lower bound on time difference between other_time and self_time.
- upper_bound – an upper bound on time difference between other_time and self_time.
- on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
interval_join_outer(other, self_time, other_time, interval, *on)
Performs an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.- Parameters
- other (
Table
) – the right side of the join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - lower_bound – a lower bound on time difference between other_time and self_time.
- upper_bound – an upper bound on time difference between other_time and self_time.
- on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
interval_join_right(other, self_time, other_time, interval, *on)
Performs an interval right join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.- Parameters
- other (
Table
) – the right side of the join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - lower_bound – a lower bound on time difference between other_time and self_time.
- upper_bound – an upper bound on time difference between other_time and self_time.
- on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_right(
t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
property ix()
Return an object that indexed by \[column\] returns new table reindexed.- Returns
Indexer – an object that when indexed by some column returns a table with rows specified by that column.
Example:
import pathway as pw
t_animals = pw.debug.parse_to_table('''
| epithet | genus
1 | upupa | epops
2 | acherontia | atropos
3 | bubo | scandiacus
4 | dynastes | hercules
''')
t_birds = pw.debug.parse_to_table('''
| desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)
ix_ref(*args, optional=False)
Returns a row, indexed by its primary keys. Several columns can be used as index.- Parameters
args (Union
[ColumnExpression
,None
,int
,float
,str
,bytes
,bool
,BasePointer
,datetime
,timedelta
,ndarray
,Tuple
[Union
[None
,int
,float
,str
,bytes
,bool
,BasePointer
,datetime
,timedelta
,ndarray
,Tuple
[Value,...
]],...
]]) – Column references. - Returns
Row – indexed row.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)
Tables obtained by a groupby/reduce scheme always have primary keys:
import pathway as pw
t1 = pw.debug.parse_to_table('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(pw.this.pet).count)
pw.debug.compute_and_print(t3, include_id=False)
Single-row tables can be accessed via ix_ref():
import pathway as pw
t1 = pw.debug.parse_to_table('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | cat
''')
t2 = t1.reduce(count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref().count)
pw.debug.compute_and_print(t3, include_id=False)
join(other, *on, id=None, how=JoinMode.INNER)
Join self with other using the given join expression.- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
[ColumnReference
]) – optional argument for id of result, can be only self.id or other.id - how (
JoinMode
) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
- other (
- Returns
JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(
t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
join_inner(other, *on, id=None)
Inner-joins two tables or join results.- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
[ColumnReference
]) – optional argument for id of result, can be only self.id or other.id
- other (
- Returns
JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
join_left(other, *on, id=None)
Left-joins two tables or join results.- Parameters
- other (
Joinable
) – Table or join result. - *on (
ColumnExpression
) – Columns to join, syntax self.col1 == other.col2 - id (
Optional
[ColumnReference
]) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- rows from the right side that were not matched with the left side are skipped
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)),
include_id=False)
join_outer(other, *on, id=None)
Outer-joins two tables or join results.- Parameters
- other (
Joinable
) – Table or join result. - *on (
ColumnExpression
) – Columns to join, syntax self.col1 == other.col2 - id (
Optional
[ColumnReference
]) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)),
include_id=False)
join_right(other, *on, id=None)
Outer-joins two tables or join results.- Parameters
- other (
Joinable
) – Table or join result. - *on (
ColumnExpression
) – Columns to join, syntax self.col1 == other.col2 - id (
Optional
[ColumnReference
]) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- rows from the left side that were not matched with the right side are skipped
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.
Example:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)),
include_id=False)
- Returns
OuterJoinResult object
pointer_from(*args, optional=False)
Pseudo-random hash of its argument. Produces pointer types. Applied column-wise.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
- Returns
None
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
- Returns
self
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
- Returns
self
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
reduce(*args, **kwargs)
Reduce a table to a single row.Equivalent to self.groupby().reduce(*args, **kwargs).
- Parameters
- args (
ColumnReference
) – reducer to reduce the table with - kwargs (
ColumnExpression
) – reducer to reduce the table with. Its key is the new name of a column.
- args (
- Returns
Table – Reduced table.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)
rename(names_mapping=None, **kwargs)
Rename columns according either a dictionary or kwargs.If a mapping is provided using a dictionary, rename_by_dict
will be used.
Otherwise, rename_columns
will be used with kwargs.
Columns not in keys(kwargs) are not changed. New name of a column must not be id
.
- Parameters
- names_mapping (
Optional
[Dict
[Union
[str
,ColumnReference
],str
]]) – mapping from old column names to new names. - kwargs (
ColumnExpression
) – mapping from old column names to new names.
- names_mapping (
- Returns
Table – self with columns renamed.
rename_by_dict(names_mapping)
Rename columns according to a dictionary.Columns not in keys(kwargs) are not changed. New name of a column must not be id.
- Parameters
names_mapping (Dict
[Union
[str
,ColumnReference
],str
]) – mapping from old column names to new names. - Returns
Table – self with columns renamed.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.rename_by_dict({"age": "years_old", t1.pet: "animal"})
pw.debug.compute_and_print(t2, include_id=False)
rename_columns(**kwargs)
Rename columns according to kwargs.Columns not in keys(kwargs) are not changed. New name of a column must not be id.
- Parameters
kwargs (Union
[str
,ColumnReference
]) – mapping from old column names to new names. - Returns
Table – self with columns renamed.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
property schema(: Type[Schema] )
Get schema of the table.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.schema.as_dict()
t1.schema['age']
select(*args, **kwargs)
Build a new table with columns specified by kwargs.Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.
- Parameters
- args (
ColumnReference
) – Column references. - kwargs (
Any
) – Column expressions with their new assigned names.
- args (
- Returns
Table – Created table.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)
property slice(: TableSlice )
Creates a collection of references to self columns. Supports basic column manipulation methods.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.slice.without("age")
sort(key=None, instance=None)
Sorts a table by the specified keys.- Parameters
- table (
Table
) – pw.Table The table to be sorted. - key (
Optional
[ColumnReference
]) – ColumnReference or None The name of the primary key to sort by. If None, the table is sorted based on the key column as primary key. - instance (
Optional
[ColumnReference
]) – ColumnReference or None The name of the secondary key to sort by. If None, the field “instance” is chosen if it exists, otherwise only the primary key is used.
- table (
- Returns
pw.Table – The sorted table. Contains two columns:prev
andnext
, containing the pointers to the previous and next rows.
Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
name | age | score
Alice | 25 | 80
Bob | 20 | 90
Charlie | 30 | 80
''')
table = table.with_id_from(pw.this.name)
table += sort(table, key=pw.this.age)
pw.debug.compute_and_print(table, include_id=False)
update_cells(other)
Updates cells of self, breaking ties in favor of the values in other.Semantics:
* result.columns == self.columns
* result.id == self.id
* conflicts are resolved preferring other’s values
Requires:
* other.columns ⊆ self.columns
* other.id ⊆ self.id
- Parameters
other (Table
) – the other table. - Returns
Table – self updated with cells form other.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet
1 | 10 | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)
update_rows(other)
Updates rows of self, breaking ties in favor for the rows in other.Semantics:
- result.columns == self.columns == other.columns
- result.id == self.id ∪ other.id
Requires:
- other.columns == self.columns
- Parameters
other (Table
) – the other table. - Returns
Table – self updated with rows form other.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
12 | 12 | Tom | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)
update_types(**kwargs)
Updates types in schema. Has no effect on the runtime.window_join(other, self_time, other_time, window, *on, how=JoinMode.INNER)
Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - window (
Window
) – a window to use. - on (
ColumnExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - how (
JoinMode
) – decides whether to run window_join_inner, window_join_left, window_join_right or window_join_outer. Default is INNER.
- other (
- Returns
WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
window_join_inner(other, self_time, other_time, window, *on)
Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - window (
Window
) – a window to use. - on (
ColumnExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_inner(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_inner(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
window_join_left(other, self_time, other_time, window, *on)
Performs a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.
- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - window (
Window
) – a window to use. - on (
ColumnExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_left(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_left(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
window_join_outer(other, self_time, other_time, window, *on)
Performs a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.
- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - window (
Window
) – a window to use. - on (
ColumnExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_outer(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_outer(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
window_join_right(other, self_time, other_time, window, *on)
Performs a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.
- Parameters
- other (
Table
) – the right side of a join. - self_time (
ColumnExpression
) – time expression in self. - other_time (
ColumnExpression
) – time expression in other. - window (
Window
) – a window to use. - on (
ColumnExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
Examples:
import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
key=t2.b, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_right(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_right(
t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t2.b, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
windowby(time_expr, *, window, shard=None)
Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard- Parameters
- time_expr (
ColumnExpression
) – Column expression used for windowing - window (
Window
) – type window to use - shard (
Optional
[ColumnExpression
]) – optional column expression to act as a shard key
- time_expr (
Examples:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
| shard | t | v
1 | 0 | 1 | 10
2 | 0 | 2 | 1
3 | 0 | 4 | 3
4 | 0 | 8 | 2
5 | 0 | 9 | 4
6 | 0 | 10| 8
7 | 1 | 1 | 9
8 | 1 | 2 | 16
''')
result = t.windowby(
t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
pw.this.shard,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
with_columns(*args, **kwargs)
Updates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| owner | pet | size
1 | Tom | 1 | 10
2 | Bob | 1 | 9
3 | Tom | 2 | 8
''').with_universe_of(t1)
t3 = t1.with_columns(*t2)
pw.debug.compute_and_print(t3, include_id=False)
with_id(new_index)
Set new ids based on another column containing id-typed values.To generate ids based on arbitrary valued columns, use with_id_from.
Values assigned must be row-wise unique.
- Parameters
new_id – column to be used as the new index. - Returns
Table with updated ids.
Example:
import pytest; pytest.xfail("with_id is hard to test")
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| new_id
1 | 2
2 | 3
3 | 4
''')
t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id)
pw.debug.compute_and_print(t3)
with_id_from(*args)
Compute new ids based on values in columns. Ids computed from columns must be row-wise unique.- Parameters
columns – columns to be used as primary keys. - Returns
Table – self updated with recomputed ids.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = t1 + t1.select(old_id=t1.id)
t3 = t2.with_id_from(t2.age)
pw.debug.compute_and_print(t3)
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id),
same_as_new=(t3.id == t3.pointer_from(t3.age)))
pw.debug.compute_and_print(t4)
with_prefix(prefix)
Rename columns by adding prefix to each name of column.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.with_prefix("u_")
pw.debug.compute_and_print(t2, include_id=False)
with_suffix(suffix)
Rename columns by adding suffix to each name of column.Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.with_suffix("_current")
pw.debug.compute_and_print(t2, include_id=False)
with_universe_of(other)
Returns a copy of self with exactly the same universe as others.Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''').with_universe_of(t1)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
without(*columns)
Selects all columns without named column references.- Parameters
columns (Union
[str
,ColumnReference
]) – columns to be dropped provided by table.column_name notation. - Returns
Table – self without specified columns.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.without(t1.age, pw.this.pet)
pw.debug.compute_and_print(t2, include_id=False)
class pw.TableLike(universe)
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
g1 = t1.groupby(t1.owner)
t2 = t1.filter(t1.age >= 9)
pw.debug.compute_and_print(t2, include_id=False)
g2 = t2.groupby(t2.owner)
pw.universes.promise_is_subset_of(g2, g1) # t2 is a subset of t1, so this is safe
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
- Returns
None
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
- Returns
self
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
- Returns
self
NOTE: The assertion works in place.
Example:
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)