pathway package
Pathway main module
Typical use:
import pathway as pw
class pathway.AsofJoinResult(side_data, mode, defaults)
Result of an ASOF join of two tables
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | K | val | t 1 | 0 | 1 | 1 2 | 0 | 2 | 4 3 | 0 | 3 | 5 4 | 0 | 4 | 6 5 | 0 | 5 | 7 6 | 0 | 6 | 11 7 | 0 | 7 | 12 8 | 1 | 8 | 5 9 | 1 | 9 | 7 ''' ) t2 = pw.debug.table_from_markdown( ''' | K | val | t 21 | 1 | 7 | 2 22 | 1 | 3 | 8 23 | 0 | 0 | 2 24 | 0 | 6 | 3 25 | 0 | 2 | 7 26 | 0 | 3 | 8 27 | 0 | 9 | 9 28 | 0 | 7 | 13 29 | 0 | 4 | 14 ''' ) res = t1.asof_join( t2, t1.t, t2.t, t1.K == t2.K, mode=pw.JoinMode.LEFT, defaults={t2.val: -1}, ).select( pw.this.shard_key, pw.this.t, val_left=t1.val, val_right=t2.val, sum=t1.val + t2.val, ) pw.debug.compute_and_print(res, include_id=False)
shard_key | t | val_left | val_right | sum0 | 1 | 1 | -1 | 00 | 4 | 2 | 6 | 80 | 5 | 3 | 6 | 90 | 6 | 4 | 6 | 100 | 7 | 5 | 6 | 110 | 11 | 6 | 9 | 150 | 12 | 7 | 9 | 161 | 5 | 8 | 7 | 151 | 7 | 9 | 7 | 16
class pathway.AsyncTransformer(input_table)
Allows to perform async transformations on a table.
invoke()
will be called asynchronously for each row of an input_table.
Output table can be acccesed via result
.
Example:
import pathway as pw import asyncio class OutputSchema(pw.Schema): ret: int class AsyncIncrementTransformer(pw.AsyncTransformer, output_schema=OutputSchema): async def invoke(self, value) -> Dict[str, Any]: await asyncio.sleep(0.1) return {"ret": value + 1 } input = pw.debug.parse_to_table(''' | value 1 | 42 2 | 44 ''') result = AsyncIncrementTransformer(input_table=input).result pw.debug.compute_and_print(result, include_id=False)
ret4243
close()
Called once at the end. Proper place for cleanup.
- Return type
None
abstract async invoke(*args, **kwargs)
Called for every row of input_table. The arguments will correspond to the columns in the input table.
Should return dict of values matching output_schema
.
- Return type
Dict
str
,Any
open()
Called before actual work. Suitable for one time setup.
- Return type
None
property result(: Tabl )
Resulting table.
with_options(capacity=None, retry_strategy=None)
Sets async options.
- Parameters
- capacity (
Optional
int
) – maximum number of concurrent operations. - retry_strategy (
Optional
AsyncRetryStrategy
) – defines how failures will be handled.
- capacity (
- Return type
AsyncTransformer
- Returns
self
class pathway.ClassArg(ref: RowReference, ptr: Pointer)
Base class to inherit from when writing inner classes for class transformers.
Example:
import pathway as pw @pw.transformer class simple_transformer: class table(pw.ClassArg): arg = pw.input_attribute() @pw.output_attribute def ret(self) -> int: return self.arg + 1 t1 = pw.debug.parse_to_table(''' age 10 9 8 7 ''') t2 = simple_transformer(table=t1.select(arg=t1.age)).table pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret7 | 88 | 99 | 1010 | 11
pointer_from(*args, optional=False)
Pseudo-random hash of its argument. Produces pointer types. Applied value-wise.
- Return type
BasePointer
class pathway.ColumnReference(*, column, table, name)
Reference to the column.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice dog 2 9 Bob dog 3 8 Alice cat 4 7 Bob dog''') isinstance(t1.age, pw.ColumnReference)
True
isinstance(t1["owner"], pw.ColumnReference)
True
property name()
Name of the referred column.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice dog 2 9 Bob dog 3 8 Alice cat 4 7 Bob dog''') t1.age.name
'age'
property table()
Table where the referred column belongs to.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice dog 2 9 Bob dog 3 8 Alice cat 4 7 Bob dog''') t1.age.table is t1
True
class pathway.FilteredJoinResult(join_result, filtering)
filter(filter_expression)
Filters rows, keeping the ones satisfying the predicate.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice 1 2 9 Bob 1 3 8 Alice 2 ''') t2 = pw.debug.parse_to_table(''' age owner pet size 11 10 Alice 3 M 12 9 Bob 1 L 13 8 Tom 1 XL ''') result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size) # noqa: E501 pw.debug.compute_and_print(result, include_id=False)
age | size8 | M9 | L10 | M
- Return type
FilteredJoinResult
groupby(*args, id=None)
Groups join result by columns from args.
NOTE: Usually followed by .reduce() that aggregates the result and returns a table.
- Parameters
- args (
ColumnReference
) – columns to group by. - id (
Optional
ColumnReference
) – if provided, is the column used to set id’s of the rows of the result
- args (
- Returns
Groupby object. - Return type
GroupedJoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' cost owner pet 1 100 Alice 1 2 90 Bob 1 3 80 Alice 2 ''') t2 = pw.debug.parse_to_table(''' cost owner pet size 11 100 Alice 3 M 12 90 Bob 1 L 13 80 Tom 1 XL ''') result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner) .reduce(pw.this.owner, pairs = pw.reducers.count())) pw.debug.compute_and_print(result, include_id=False)
owner | pairsAlice | 2Bob | 1
join(other, *on, id=None, how=JoinMode.INNER)
Join self with other using the given join expression.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id - how (
JoinMode
) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_inner(other, *on, id=None)
Inner-joins two tables or join results.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_left(other, *on, id=None)
Left-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- rows from the right side that were not matched with the left side are skipped
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)), include_id=False)
A | t2_C | S11 | 11 | 32212 | 12 | 32413 | |13 | |
join_outer(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 32413 | |13 | |
join_right(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- rows from the left side that were not matched with the right side are skipped
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 324
- Returns
OuterJoinResult object
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
reduce(*args, **kwargs)
Reduce a join result to a single row.
Equivalent to self.groupby().reduce(*args, **kwargs).
- Parameters
- args (
ColumnReference
) – reducer to reduce the table with - kwargs (
ColumnExpression
) – reducer to reduce the table with. Its key is the new name of a column.
- args (
- Returns
Reduced table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' cost owner pet 1 100 Alice 1 2 90 Bob 1 3 80 Alice 2 ''') t2 = pw.debug.parse_to_table(''' cost owner pet size 11 100 Alice 3 M 12 90 Bob 1 L 13 80 Tom 1 XL ''') result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count()) pw.debug.compute_and_print(result, include_id=False)
total_pairs3
select(*args, **kwargs)
Computes result of a join.
- Parameters
- args (
ColumnReference
) – Column references. - kwargs (
Any
) – Column expressions with their new assigned names.
- args (
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size9 | Bob | L
class pathway.GroupedJoinResult(*, join_result, args, id)
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
reduce(*args, **kwargs)
Reduces grouped join result to table.
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' cost owner pet 1 100 Alice 1 2 90 Bob 1 3 80 Alice 2 ''') t2 = pw.debug.parse_to_table(''' cost owner pet size 11 100 Alice 3 M 12 90 Bob 1 L 13 80 Tom 1 XL ''') result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner) .reduce(pw.this.owner, pairs = pw.reducers.count())) pw.debug.compute_and_print(result, include_id=False)
owner | pairsAlice | 2Bob | 1
class pathway.GroupedTable(table, grouping_columns, set_id=False)
Result of a groupby operation on a Table.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.groupby(t1.pet, t1.owner) isinstance(t2, pw.GroupedTable)
True
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
reduce(*args, **kwargs)
Reduces grouped table to a table.
- Parameters
- args (
ColumnReference
) – Column references. - kwargs (
ColumnExpression
) – Column expressions with their new assigned names.
- args (
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age)) pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageaggAlice | cat | 8Alice | dog | 10Bob | dog | 16
class pathway.IntervalJoinResult(left_bucketed, right_bucketed, earlier_part_filtered, later_part_filtered, table_substitution, mode)
Result of an interval join between tables.
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 3 2 | 4 3 | 5 4 | 11 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 0 2 | 1 3 | 4 4 | 7 ''' ) join_result = t1.interval_join(t2, t1.t, t2.t, -2, 1) isinstance(join_result, pw.IntervalJoinResult)
True
pw.debug.compute_and_print( join_result.select(left_t=t1.t, right_t=t2.t), include_id=False )
left_t | right_t3 | 13 | 44 | 45 | 4
select(*args, **kwargs)
Computes a result of an interval join.
- Parameters
- args (
ColumnReference
) – Column references. - kwargs (
Any
) – Column expressions with their new assigned names.
- args (
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 3 2 | 1 | 4 3 | 1 | 5 4 | 1 | 11 5 | 2 | 2 6 | 2 | 3 7 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 0 2 | 1 | 1 3 | 1 | 4 4 | 1 | 7 5 | 2 | 0 6 | 2 | 2 7 | 4 | 2 ''' ) t3 = t1.interval_join(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select( t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t1 | 3 | 11 | 3 | 41 | 4 | 41 | 5 | 42 | 2 | 02 | 2 | 22 | 3 | 2
class pathway.JoinMode(value)
An enumeration.
class pathway.JoinResult(_universe, _context, _left_table, _right_table, _original_left, _original_right, _substitution, _chained_join_desugaring, _joined_on_names, _join_mode)
Result of a join between tables.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice 1 2 9 Bob 1 3 8 Alice 2 ''') t2 = pw.debug.parse_to_table(''' age owner pet size 11 10 Alice 3 M 12 9 Bob 1 L 13 8 Tom 1 XL ''') joinresult= t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner) # noqa: E501 isinstance(joinresult, pw.JoinResult)
True
pw.debug.compute_and_print(joinresult.select(t1.age, t2.size), include_id=False)
age | size9 | L
filter(filter_expression)
Filters rows, keeping the ones satisfying the predicate.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice 1 2 9 Bob 1 3 8 Alice 2 ''') t2 = pw.debug.parse_to_table(''' age owner pet size 11 10 Alice 3 M 12 9 Bob 1 L 13 8 Tom 1 XL ''') result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size) # noqa: E501 pw.debug.compute_and_print(result, include_id=False)
age | size8 | M9 | L10 | M
- Return type
FilteredJoinResult
groupby(*args, id=None)
Groups join result by columns from args.
NOTE: Usually followed by .reduce() that aggregates the result and returns a table.
- Parameters
- args (
ColumnReference
) – columns to group by. - id (
Optional
ColumnReference
) – if provided, is the column used to set id’s of the rows of the result
- args (
- Returns
Groupby object. - Return type
GroupedJoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' cost owner pet 1 100 Alice 1 2 90 Bob 1 3 80 Alice 2 ''') t2 = pw.debug.parse_to_table(''' cost owner pet size 11 100 Alice 3 M 12 90 Bob 1 L 13 80 Tom 1 XL ''') result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner) .reduce(pw.this.owner, pairs = pw.reducers.count())) pw.debug.compute_and_print(result, include_id=False)
owner | pairsAlice | 2Bob | 1
join(other, *on, id=None, how=JoinMode.INNER)
Join self with other using the given join expression.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id - how (
JoinMode
) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_inner(other, *on, id=None)
Inner-joins two tables or join results.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_left(other, *on, id=None)
Left-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- rows from the right side that were not matched with the left side are skipped
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)), include_id=False)
A | t2_C | S11 | 11 | 32212 | 12 | 32413 | |13 | |
join_outer(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 32413 | |13 | |
join_right(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- rows from the left side that were not matched with the right side are skipped
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 324
- Returns
OuterJoinResult object
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
reduce(*args, **kwargs)
Reduce a join result to a single row.
Equivalent to self.groupby().reduce(*args, **kwargs).
- Parameters
- args (
ColumnReference
) – reducer to reduce the table with - kwargs (
ColumnExpression
) – reducer to reduce the table with. Its key is the new name of a column.
- args (
- Returns
Reduced table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' cost owner pet 1 100 Alice 1 2 90 Bob 1 3 80 Alice 2 ''') t2 = pw.debug.parse_to_table(''' cost owner pet size 11 100 Alice 3 M 12 90 Bob 1 L 13 80 Tom 1 XL ''') result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count()) pw.debug.compute_and_print(result, include_id=False)
total_pairs3
select(*args, **kwargs)
Computes result of a join.
- Parameters
- args (
ColumnReference
) – Column references. - kwargs (
Any
) – Column expressions with their new assigned names.
- args (
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size9 | Bob | L
class pathway.Joinable(universe)
join(other, *on, id=None, how=JoinMode.INNER)
Join self with other using the given join expression.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id - how (
JoinMode
) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_inner(other, *on, id=None)
Inner-joins two tables or join results.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_left(other, *on, id=None)
Left-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- rows from the right side that were not matched with the left side are skipped
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)), include_id=False)
A | t2_C | S11 | 11 | 32212 | 12 | 32413 | |13 | |
join_outer(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 32413 | |13 | |
join_right(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- rows from the left side that were not matched with the right side are skipped
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 324
- Returns
OuterJoinResult object
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
class pathway.Pointer()
Pointer to row type.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.select(col=t1.id) t2.schema['col'] is pw.Pointer
True
class pathway.Schema()
Base class to inherit from when creating schemas. All schemas should be subclasses of this one.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice dog 2 9 Bob dog 3 8 Alice cat 4 7 Bob dog''') t1.schema.as_dict()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}
issubclass(t1.schema, pw.Schema)
True
class NewSchema(pw.Schema): foo: int SchemaSum = NewSchema | t1.schema SchemaSum.as_dict()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>, 'foo': <class 'int'>}
class pathway.StatsMonitoringLevel()
Specifies the verbosity of computation graph monitoring mechanism
NONE: No monitoring
IN_OUT: Monitor input and output latency. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.
ALL: Monitor latency for each operator in the execution graph latency. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.
class pathway.Table(columns, universe, pk_columns={}, schema=None, id_column=None)
Collection of named columns over identical universes.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') isinstance(t1, pw.Table)
True
asof_join(other, t_left, t_right, *on, mode, defaults={})
Perform an ASOF join of two tables
- Parameters
- other (
Table
) – Table to join with self, both must contain a column val - t_left (
ColumnExpression
) – time-like column expression to do the join against - t_right (
ColumnExpression
) – time-like column expression to do the join against - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - mode (
JoinMode
) – mode of the join (LEFT, RIGHT, FULL) - defaults (
Dict
ColumnReference
,Any
) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
- other (
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | K | val | t 1 | 0 | 1 | 1 2 | 0 | 2 | 4 3 | 0 | 3 | 5 4 | 0 | 4 | 6 5 | 0 | 5 | 7 6 | 0 | 6 | 11 7 | 0 | 7 | 12 8 | 1 | 8 | 5 9 | 1 | 9 | 7 ''' ) t2 = pw.debug.table_from_markdown( ''' | K | val | t 21 | 1 | 7 | 2 22 | 1 | 3 | 8 23 | 0 | 0 | 2 24 | 0 | 6 | 3 25 | 0 | 2 | 7 26 | 0 | 3 | 8 27 | 0 | 9 | 9 28 | 0 | 7 | 13 29 | 0 | 4 | 14 ''' ) res = t1.asof_join( t2, t1.t, t2.t, t1.K == t2.K, mode=pw.JoinMode.LEFT, defaults={t2.val: -1}, ).select( pw.this.shard_key, pw.this.t, val_left=t1.val, val_right=t2.val, sum=t1.val + t2.val, ) pw.debug.compute_and_print(res, include_id=False)
shard_key | t | val_left | val_right | sum0 | 1 | 1 | -1 | 00 | 4 | 2 | 6 | 80 | 5 | 3 | 6 | 90 | 6 | 4 | 6 | 100 | 7 | 5 | 6 | 110 | 11 | 6 | 9 | 150 | 12 | 7 | 9 | 161 | 5 | 8 | 7 | 151 | 7 | 9 | 7 | 16
concat(*others)
Concats self with every other ∊ others.
Semantics:
- result.columns == self.columns == other.columns
- result.id == self.id ∪ other.id
if self.id and other.id collide, throws an exception.
Requires:
- other.columns == self.columns
- self.id disjoint with other.id
- Parameters
other – the other table. - Returns
The concatenated table. Id’s of rows from original tables are preserved. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''') pw.universes.promise_are_pairwise_disjoint(t1, t2) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
concat_reindex(*tables)
Concatenate contents of several tables.
This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.
- Parameters
tables (Table
) – List of tables to concatenate. All tables must have the same schema. - Returns
The concatenated table. It will have new, synthetic ids. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | pet 1 | Manul 8 | Octopus ''') t3 = t1.concat_reindex(t2) pw.debug.compute_and_print(t3, include_id=False)
petCatDogManulOctopus
copy()
Returns a copy of a table.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.copy() pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet7 | Bob | dog8 | Alice | cat9 | Bob | dog10 | Alice | dog
t1 is t2
False
- Return type
Table
difference(other)
Restrict self universe to keys not appearing in the other table.
- Parameters
other (Table
) – table with ids to remove from self. - Returns
table with restricted universe, with the same set of columns - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | cost 2 | 100 3 | 200 4 | 300 ''') t3 = t1.difference(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet10 | Alice | 1
dtypes()
Return the types of the columns as a dictionary.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t1.dtypes()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}
t1.schema['age']
<class 'int'>
static empty(**kwargs)
Creates an empty table with a schema specified by kwargs.
- Parameters
kwargs (NewType
(DType
,type
)) – Dict whose keys are column names and values are column types. - Returns
Created empty table. - Return type
Table
Example:
import pathway as pw t1 = pw.Table.empty(age=float, pet=float) pw.debug.compute_and_print(t1, include_id=False)
age | pet
filter(filter_expression)
Filter a table according to filter condition.
- Parameters
filter – ColumnExpression that specifies the filtering condition. - Returns
Result has the same schema as self and its ids are subset of self.id. - Return type
Table
Example:
import pathway as pw vertices = pw.debug.parse_to_table(''' label outdegree 1 3 7 0 ''') filtered = vertices.filter(vertices.outdegree == 0) pw.debug.compute_and_print(filtered, include_id=False)
label | outdegree7 | 0
flatten(*args, **kwargs)
Performs a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.
It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet | age 1 | Dog | 2 7 | Cat | 5 ''') t2 = t1.flatten(t1.pet) pw.debug.compute_and_print(t2, include_id=False)
petCDagot
t3 = t1.flatten(t1.pet, t1.age) pw.debug.compute_and_print(t3, include_id=False)
pet | ageC | 5D | 2a | 5g | 2o | 2t | 5
- Return type
Table
static from_columns(*args, **kwargs)
Build a table from columns.
All columns must have the same ids. Columns’ names must be pairwise distinct.
- Parameters
- args (
ColumnReference
) – List of columns. - kwargs (
ColumnReference
) – Columns with their new names.
- args (
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.Table.empty(age=float, pet=float) t2 = pw.Table.empty(foo=float, bar=float) t3 = pw.Table.from_columns(t1.pet, qux=t2.foo) pw.debug.compute_and_print(t3, include_id=False)
pet | qux
groupby(*args, id=None)
Groups table by columns from args.
NOTE: Usually followed by .reduce() that aggregates the result and returns a table.
- Parameters
- args (
ColumnReference
) – columns to group by. - id (
Optional
ColumnReference
) – if provided, is the column used to set id’s of the rows of the result
- args (
- Returns
Groupby object. - Return type
GroupedTable
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age)) pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageaggAlice | cat | 8Alice | dog | 10Bob | dog | 16
having(*indexers)
Removes rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexers
- Return type
Table
property id(: ColumnReferenc )
Get reference to pseudocolumn containing id’s of a table.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.select(ids = t1.id) pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
testTrueTrueTrueTrue
- Return type
ColumnReference
interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)
Interpolates missing values in a column using the previous and next values based on a timestamps column.
- Parameters
- timestamp (ColumnReference) – Reference to the column containing timestamps.
- *values (ColumnReference) – References to the columns containing values to be interpolated.
- mode (InterpolateMode*, *optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
- Returns
A new table with the interpolated values. - Return type
Table - Raises
ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.
NOTE:
- The interpolation is performed based on linear interpolation between the previous and next values.
- If a value is missing at the beginning or end of the column, no interpolation is performed.
Example:
import pathway as pw table = pw.debug.table_from_markdown(''' timestamp | values_a | values_b 1 | 1 | 10 2 | | 3 | 3 | 4 | | 5 | | 6 | 6 | 60 ''') table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b) pw.debug.compute_and_print(table, include_id=False)
timestamp | values_a | values_b1 | 1 | 102 | 2.0 | 20.03 | 3 | 30.04 | 4.0 | 40.05 | 5.0 | 50.06 | 6 | 60
intersect(*tables)
Restrict self universe to keys appearing in all of the tables.
- Parameters
tables (Table
) – tables keys of which are used to restrict universe. - Returns
table with restricted universe, with the same set of columns - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | cost 2 | 100 3 | 200 4 | 300 ''') t3 = t1.intersect(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 1
interval_join(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)
Performs an interval join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined.
- Parameters
- other (
Table
) – the right side of a join. - self_time_expression (
ColumnExpression
) – time expression in self. - other_time_expression (
ColumnExpression
) – time expression in other. - lower_bound (
int
|float
|timedelta
) – a lower bound on time difference between other_time_expression and self_time_expression. - upper_bound (
int
|float
|timedelta
) – an upper bound on time difference between other_time_expression and self_time_expression. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
IntervalJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 3 2 | 4 3 | 5 4 | 11 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 0 2 | 1 3 | 4 4 | 7 ''' ) t3 = t1.interval_join(t2, t1.t, t2.t, -2, 1).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t3 | 13 | 44 | 45 | 4
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 3 2 | 1 | 4 3 | 1 | 5 4 | 1 | 11 5 | 2 | 2 6 | 2 | 3 7 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 0 2 | 1 | 1 3 | 1 | 4 4 | 1 | 7 5 | 2 | 0 6 | 2 | 2 7 | 4 | 2 ''' ) t3 = t1.interval_join(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select( t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t1 | 3 | 11 | 3 | 41 | 4 | 41 | 5 | 42 | 2 | 02 | 2 | 22 | 3 | 2
interval_join_left(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)
Performs an interval left join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.
- Parameters
- other (
Table
) – the right side of the join. - self_time_expression (
ColumnExpression
) – time expression in self. - other_time_expression (
ColumnExpression
) – time expression in other. - lower_bound (
int
|float
|timedelta
) – a lower bound on time difference between other_time_expression and self_time_expression. - upper_bound (
int
|float
|timedelta
) – an upper bound on time difference between other_time_expression and self_time_expression. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
IntervalJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 3 2 | 4 3 | 5 4 | 11 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 0 2 | 1 3 | 4 4 | 7 ''' ) t3 = t1.interval_join_left(t2, t1.t, t2.t, -2, 1).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t3 | 13 | 44 | 45 | 411 |
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 3 2 | 1 | 4 3 | 1 | 5 4 | 1 | 11 5 | 2 | 2 6 | 2 | 3 7 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 0 2 | 1 | 1 3 | 1 | 4 4 | 1 | 7 5 | 2 | 0 6 | 2 | 2 7 | 4 | 2 ''' ) t3 = t1.interval_join_left(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select( t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t1 | 3 | 11 | 3 | 41 | 4 | 41 | 5 | 41 | 11 |2 | 2 | 02 | 2 | 22 | 3 | 23 | 4 |
interval_join_outer(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)
Performs an interval outer join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.
- Parameters
- other (
Table
) – the right side of the join. - self_time_expression (
ColumnExpression
) – time expression in self. - other_time_expression (
ColumnExpression
) – time expression in other. - lower_bound (
int
|float
|timedelta
) – a lower bound on time difference between other_time_expression and self_time_expression. - upper_bound (
int
|float
|timedelta
) – an upper bound on time difference between other_time_expression and self_time_expression. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
IntervalJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 3 2 | 4 3 | 5 4 | 11 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 0 2 | 1 3 | 4 4 | 7 ''' ) t3 = t1.interval_join_outer(t2, t1.t, t2.t, -2, 1).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t | 0 | 73 | 13 | 44 | 45 | 411 |
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 3 2 | 1 | 4 3 | 1 | 5 4 | 1 | 11 5 | 2 | 2 6 | 2 | 3 7 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 0 2 | 1 | 1 3 | 1 | 4 4 | 1 | 7 5 | 2 | 0 6 | 2 | 2 7 | 4 | 2 ''' ) t3 = t1.interval_join_outer(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select( t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t | | 0 | | 2 | | 71 | 3 | 11 | 3 | 41 | 4 | 41 | 5 | 41 | 11 |2 | 2 | 02 | 2 | 22 | 3 | 23 | 4 |
interval_join_right(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)
Performs an interval right join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.
- Parameters
- other (
Table
) – the right side of the join. - self_time_expression (
ColumnExpression
) – time expression in self. - other_time_expression (
ColumnExpression
) – time expression in other. - lower_bound (
int
|float
|timedelta
) – a lower bound on time difference between other_time_expression and self_time_expression. - upper_bound (
int
|float
|timedelta
) – an upper bound on time difference between other_time_expression and self_time_expression. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
IntervalJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 3 2 | 4 3 | 5 4 | 11 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 0 2 | 1 3 | 4 4 | 7 ''' ) t3 = t1.interval_join_right(t2, t1.t, t2.t, -2, 1).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t | 0 | 73 | 13 | 44 | 45 | 4
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 3 2 | 1 | 4 3 | 1 | 5 4 | 1 | 11 5 | 2 | 2 6 | 2 | 3 7 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 0 2 | 1 | 1 3 | 1 | 4 4 | 1 | 7 5 | 2 | 0 6 | 2 | 2 7 | 4 | 2 ''' ) t3 = t1.interval_join_right(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select( t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t | | 0 | | 2 | | 71 | 3 | 11 | 3 | 41 | 4 | 41 | 5 | 42 | 2 | 02 | 2 | 22 | 3 | 2
property ix()
Return an object that indexed by column returns new table reindexed.
- Returns
an object that when indexed by some column returns a table with rows specified by that column. - Return type
Indexer
Example:
import pathway as pw t_animals = pw.debug.parse_to_table(''' | epithet | genus 1 | upupa | epops 2 | acherontia | atropos 3 | bubo | scandiacus 4 | dynastes | hercules ''') t_birds = pw.debug.parse_to_table(''' | desc 2 | hoopoe 4 | owl ''') ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus) pw.debug.compute_and_print(ret, include_id=False)
desc | latinhoopoe | atroposowl | hercules
ix_ref(*args, optional=False)
Returns a row, indexed by its primary keys. Several columns can be used as index.
- Parameters
args (ColumnExpression
) – Column references. - Returns
indexed row. - Return type
Row
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' name | pet Alice | dog Bob | cat Carole | cat David | dog ''') t2 = t1.with_id_from(pw.this.name) t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet) pw.debug.compute_and_print(t2, include_id=False)
name | pet | new_valueAlice | dog | dogBob | cat | dogCarole | cat | dogDavid | dog | dog
Tables obtained by a groupby/reduce scheme always have primary keys:
import pathway as pw t1 = pw.debug.parse_to_table(''' name | pet Alice | dog Bob | cat Carole | cat David | cat ''') t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count(pw.this.name)) t3 = t1.select(*pw.this, new_value=t2.ix_ref(pw.this.pet).count) pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_valueAlice | dog | 1Bob | cat | 3Carole | cat | 3David | cat | 3
Single-row tables can be accessed via ix_ref():
import pathway as pw t1 = pw.debug.parse_to_table(''' name | pet Alice | dog Bob | cat Carole | cat David | cat ''') t2 = t1.reduce(count=pw.reducers.count(pw.this.name)) t3 = t1.select(*pw.this, new_value=t2.ix_ref().count) pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_valueAlice | dog | 4Bob | cat | 4Carole | cat | 4David | cat | 4
join(other, *on, id=None, how=JoinMode.INNER)
Join self with other using the given join expression.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id - how (
JoinMode
) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_inner(other, *on, id=None)
Inner-joins two tables or join results.
- Parameters
- other (
Joinable
) – the right side of the join. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference. - id (
Optional
ColumnReference
) – optional argument for id of result, can be only self.id or other.id
- other (
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501 pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size9 | Bob | L
join_left(other, *on, id=None)
Left-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- rows from the right side that were not matched with the left side are skipped
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)), include_id=False)
A | t2_C | S11 | 11 | 32212 | 12 | 32413 | |13 | |
join_outer(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 32413 | |13 | |
join_right(other, *on, id=None)
Outer-joins two tables or join results.
- Parameters
- other (
Joinable
) – Table or join result. - *on – Columns to join, syntax self.col1 == other.col2
- id (
Optional
ColumnReference
) – optional id column of the result
- other (
Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)
Behavior:
- rows from the left side that were not matched with the right side are skipped
- for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
- for rows that were matched the behavior is the same as that of an inner join.
- Returns
an object on which .select() may be called to extract relevant columns from the result of the join. - Return type
JoinResult
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | A | B 1 | 11 | 111 2 | 12 | 112 3 | 13 | 113 4 | 13 | 114 ''' ) t2 = pw.debug.table_from_markdown( ''' | C | D 1 | 11 | 211 2 | 12 | 212 3 | 14 | 213 4 | 14 | 214 ''' ) pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C ).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)), include_id=False)
A | t2_C | S | 14 | | 14 |11 | 11 | 32212 | 12 | 324
- Returns
OuterJoinResult object
pointer_from(*args, optional=False)
Pseudo-random hash of its argument. Produces pointer types. Applied column-wise.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice dog 2 9 Bob dog 3 8 Alice cat 4 7 Bob dog''') g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
testTrueTrue
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
reduce(*args, **kwargs)
Reduce a table to a single row.
Equivalent to self.groupby().reduce(*args, **kwargs).
- Parameters
- args (
ColumnReference
) – reducer to reduce the table with - kwargs (
ColumnExpression
) – reducer to reduce the table with. Its key is the new name of a column.
- args (
- Returns
Reduced table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age)) pw.debug.compute_and_print(t2, include_id=False)
ageagg^...
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet) pw.debug.compute_and_print(t3, include_id=False)
age | pet7 | dog
rename_columns(**kwargs)
Rename columns according to kwargs.
Columns not in keys(kwargs) are not changed. New name of a column must not be id.
- Parameters
kwargs (ColumnReference
) – mapping from old column names to new names. - Returns
self with columns renamed. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet) pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animalAlice | 8 | 2Alice | 10 | 1Bob | 9 | 1
property schema(: Type[Schema )
Get schema of the table.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') t1.schema.as_dict()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}
t1.schema['age']
<class 'int'>
- Return type
Type
Schema
select(*args, **kwargs)
Build a new table with columns specified by kwargs.
Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.
- Parameters
- args (
ColumnReference
) – Column references. - kwargs (
Any
) – Column expressions with their new assigned names.
- args (
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' pet Dog Cat ''') t2 = t1.select(animal=t1.pet, desc="fluffy") pw.debug.compute_and_print(t2, include_id=False)
animal | descCat | fluffyDog | fluffy
sort(key=None, instance=None)
Sorts the table by the specified keys.
- Parameters
- key (
Optional
ColumnReference
) – ColumnReference or None The name of the primary key to sort by. If None, the table is sorted based on the key column as primary key. - instance (
Optional
ColumnReference
) – ColumnReference or None The name of the secondary key to sort by. If None, the field “instance” is chosen if it exists, otherwise only the primary key is used.
- key (
- Returns
The sorted table. Contains two columns:prev
andnext
, containing the pointers to the previous and next rows. - Return type
pw.Table
Example:
import pathway as pw table = pw.debug.table_from_markdown(''' name | age | score Alice | 25 | 80 Bob | 20 | 90 Charlie | 30 | 80 ''') table = table.with_id_from(pw.this.name) table += table.sort(key=pw.this.age) pw.debug.compute_and_print(table)
| name | age | score | next | prev^GBSDEEW... | Alice | 25 | 80 | ^DS9AT95... | ^EDPSSB1...^EDPSSB1... | Bob | 20 | 90 | ^GBSDEEW... |^DS9AT95... | Charlie | 30 | 80 | | ^GBSDEEW...
update_cells(other)
Updates cells of self, breaking ties in favor of the values in other.
Semantics:
* result.columns == self.columns
* result.id == self.id
* conflicts are resolved preferring other’s values
Requires:
* other.columns ⊆ self.columns
* other.id ⊆ self.id
- Parameters
other (Table
) – the other table. - Returns
self updated with cells form other. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet 1 | 10 | Alice | 30 ''') pw.universes.promise_is_subset_of(t2, t1) t3 = t1.update_cells(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
update_columns(*args, **kwargs)
Updates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | owner | pet | size 1 | Tom | 1 | 10 2 | Bob | 1 | 9 3 | Tom | 2 | 8 ''').with_universe_of(t1) t3 = t1.update_columns(*t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet | size8 | Tom | 2 | 89 | Bob | 1 | 910 | Tom | 1 | 10
- Return type
Table
update_rows(other)
Updates rows of self, breaking ties in favor for the rows in other.
Semantics:
- result.columns == self.columns == other.columns
- result.id == self.id ∪ other.id
Requires:
- other.columns == self.columns
- Parameters
other (Table
) – the other table. - Returns
self updated with rows form other. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 12 | 12 | Tom | 40 ''') t3 = t1.update_rows(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 3012 | Tom | 40
update_types(**kwargs)
Updates types in schema. Has no effect on the runtime.
- Return type
Table
window_join(other, left_time_expression, right_time_expression, window, *on)
Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).
When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
- Parameters
- other (
Table
) – the right side of a join. - self_time_expression – time expression in self.
- other_time_expression – time expression in other.
- window (
Window
) – a window to use. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
WindowJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 1 2 | 2 3 | 3 4 | 7 5 | 13 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 2 2 | 5 3 | 6 4 | 7 ''' ) t3 = t1.window_join(t2, t1.t, t2.t, pw.window.tumbling(2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t2 | 23 | 27 | 67 | 7
t4 = t1.window_join(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t1 | 22 | 22 | 23 | 27 | 67 | 77 | 7
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 2 3 | 1 | 3 4 | 1 | 7 5 | 1 | 13 6 | 2 | 1 7 | 2 | 2 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 2 2 | 1 | 5 3 | 1 | 6 4 | 1 | 7 5 | 2 | 2 6 | 2 | 3 7 | 4 | 3 ''' ) t3 = t1.window_join(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select( key=t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | 2 | 21 | 3 | 21 | 7 | 61 | 7 | 72 | 2 | 22 | 2 | 3
t1 = pw.debug.table_from_markdown( ''' | t 0 | 0 1 | 5 2 | 10 3 | 15 4 | 17 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 0 | -3 1 | 2 2 | 3 3 | 6 4 | 16 ''' ) t3 = t1.window_join( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2) ).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t0 | 20 | 30 | 65 | 25 | 35 | 615 | 1617 | 16
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 4 3 | 1 | 7 4 | 2 | 0 5 | 2 | 3 6 | 2 | 4 7 | 2 | 7 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | -1 2 | 1 | 6 3 | 2 | 2 4 | 2 | 10 5 | 4 | 3 ''' ) t3 = t1.window_join( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b ).select(key=t1.a, left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | 1 | -11 | 4 | 61 | 7 | 62 | 0 | 22 | 3 | 22 | 4 | 2
window_join_left(other, left_time_expression, right_time_expression, window, *on)
Performs a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).
When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.
- Parameters
- other (
Table
) – the right side of a join. - self_time_expression – time expression in self.
- other_time_expression – time expression in other.
- window (
Window
) – a window to use. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
WindowJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 1 2 | 2 3 | 3 4 | 7 5 | 13 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 2 2 | 5 3 | 6 4 | 7 ''' ) t3 = t1.window_join_left(t2, t1.t, t2.t, pw.window.tumbling(2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t1 |2 | 23 | 27 | 67 | 713 |
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t1 |1 | 22 | 22 | 23 |3 | 27 | 67 | 77 | 713 |13 |
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 2 3 | 1 | 3 4 | 1 | 7 5 | 1 | 13 6 | 2 | 1 7 | 2 | 2 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 2 2 | 1 | 5 3 | 1 | 6 4 | 1 | 7 5 | 2 | 2 6 | 2 | 3 7 | 4 | 3 ''' ) t3 = t1.window_join_left(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select( key=t1.a, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | 1 |1 | 2 | 21 | 3 | 21 | 7 | 61 | 7 | 71 | 13 |2 | 1 |2 | 2 | 22 | 2 | 33 | 4 |
t1 = pw.debug.table_from_markdown( ''' | t 0 | 0 1 | 5 2 | 10 3 | 15 4 | 17 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 0 | -3 1 | 2 2 | 3 3 | 6 4 | 16 ''' ) t3 = t1.window_join_left( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2) ).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t0 | 20 | 30 | 65 | 25 | 35 | 610 |15 | 1617 | 16
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 4 3 | 1 | 7 4 | 2 | 0 5 | 2 | 3 6 | 2 | 4 7 | 2 | 7 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | -1 2 | 1 | 6 3 | 2 | 2 4 | 2 | 10 5 | 4 | 3 ''' ) t3 = t1.window_join_left( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b ).select(key=t1.a, left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | 1 | -11 | 4 | 61 | 7 | 62 | 0 | 22 | 3 | 22 | 4 | 22 | 7 |3 | 4 |
window_join_outer(other, left_time_expression, right_time_expression, window, *on)
Performs a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).
When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.
- Parameters
- other (
Table
) – the right side of a join. - self_time_expression – time expression in self.
- other_time_expression – time expression in other.
- window (
Window
) – a window to use. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
WindowJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 1 2 | 2 3 | 3 4 | 7 5 | 13 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 2 2 | 5 3 | 6 4 | 7 ''' ) t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t | 51 |2 | 23 | 27 | 67 | 713 |
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t | 5 | 5 | 61 |1 | 22 | 22 | 23 |3 | 27 | 67 | 77 | 713 |13 |
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 2 3 | 1 | 3 4 | 1 | 7 5 | 1 | 13 6 | 2 | 1 7 | 2 | 2 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 2 2 | 1 | 5 3 | 1 | 6 4 | 1 | 7 5 | 2 | 2 6 | 2 | 3 7 | 4 | 3 ''' ) t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select( key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | | 51 | 1 |1 | 2 | 21 | 3 | 21 | 7 | 61 | 7 | 71 | 13 |2 | 1 |2 | 2 | 22 | 2 | 33 | 4 |4 | | 3
t1 = pw.debug.table_from_markdown( ''' | t 0 | 0 1 | 5 2 | 10 3 | 15 4 | 17 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 0 | -3 1 | 2 2 | 3 3 | 6 4 | 16 ''' ) t3 = t1.window_join_outer( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2) ).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t | -30 | 20 | 30 | 65 | 25 | 35 | 610 |15 | 1617 | 16
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 4 3 | 1 | 7 4 | 2 | 0 5 | 2 | 3 6 | 2 | 4 7 | 2 | 7 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | -1 2 | 1 | 6 3 | 2 | 2 4 | 2 | 10 5 | 4 | 3 ''' ) t3 = t1.window_join_outer( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b ).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | 1 | -11 | 4 | 61 | 7 | 62 | | 102 | 0 | 22 | 3 | 22 | 4 | 22 | 7 |3 | 4 |4 | | 3
window_join_right(other, left_time_expression, right_time_expression, window, *on)
Performs a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).
When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.
Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.
- Parameters
- other (
Table
) – the right side of a join. - self_time_expression – time expression in self.
- other_time_expression – time expression in other.
- window (
Window
) – a window to use. - on (
ColumnBinaryOpExpression
) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
- other (
- Returns
a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join. - Return type
WindowJoinResult
Examples:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 1 2 | 2 3 | 3 4 | 7 5 | 13 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 2 2 | 5 3 | 6 4 | 7 ''' ) t3 = t1.window_join_right(t2, t1.t, t2.t, pw.window.tumbling(2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t | 52 | 23 | 27 | 67 | 7
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select( left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t | 5 | 5 | 61 | 22 | 22 | 23 | 27 | 67 | 77 | 7
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 2 3 | 1 | 3 4 | 1 | 7 5 | 1 | 13 6 | 2 | 1 7 | 2 | 2 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | 2 2 | 1 | 5 3 | 1 | 6 4 | 1 | 7 5 | 2 | 2 6 | 2 | 3 7 | 4 | 3 ''' ) t3 = t1.window_join_right(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select( key=t2.b, left_t=t1.t, right_t=t2.t ) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | | 51 | 2 | 21 | 3 | 21 | 7 | 61 | 7 | 72 | 2 | 22 | 2 | 34 | | 3
t1 = pw.debug.table_from_markdown( ''' | t 0 | 0 1 | 5 2 | 10 3 | 15 4 | 17 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 0 | -3 1 | 2 2 | 3 3 | 6 4 | 16 ''' ) t3 = t1.window_join_right( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2) ).select(left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t | -30 | 20 | 30 | 65 | 25 | 35 | 615 | 1617 | 16
t1 = pw.debug.table_from_markdown( ''' | a | t 1 | 1 | 1 2 | 1 | 4 3 | 1 | 7 4 | 2 | 0 5 | 2 | 3 6 | 2 | 4 7 | 2 | 7 8 | 3 | 4 ''' ) t2 = pw.debug.table_from_markdown( ''' | b | t 1 | 1 | -1 2 | 1 | 6 3 | 2 | 2 4 | 2 | 10 5 | 4 | 3 ''' ) t3 = t1.window_join_right( t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b ).select(key=t2.b, left_t=t1.t, right_t=t2.t) pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t1 | 1 | -11 | 4 | 61 | 7 | 62 | | 102 | 0 | 22 | 3 | 22 | 4 | 24 | | 3
windowby(time_expr, *, window, shard=None)
Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard
- Parameters
- time_expr (
ColumnExpression
) – Column expression used for windowing - window (
Window
) – type window to use - shard (
Optional
ColumnExpression
) – optional column expression to act as a shard key
- time_expr (
Examples:
import pathway as pw t = pw.debug.table_from_markdown( ''' | shard | t | v 1 | 0 | 1 | 10 2 | 0 | 2 | 1 3 | 0 | 4 | 3 4 | 0 | 8 | 2 5 | 0 | 9 | 4 6 | 0 | 10| 8 7 | 1 | 1 | 9 8 | 1 | 2 | 16 ''') result = t.windowby( t.t, window=pw.window.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard ).reduce( shard=pw.reducers.min(pw.this.shard), min_t=pw.reducers.min(pw.this.t), max_v=pw.reducers.max(pw.this.v), count=pw.reducers.count(pw.this.t), ) pw.debug.compute_and_print(result, include_id=False)
shard | min_t | max_v | count0 | 1 | 10 | 20 | 4 | 3 | 10 | 8 | 8 | 31 | 1 | 16 | 2
- Return type
GroupedTable
with_id(new_index)
Set new ids based on another column containing id-typed values.
To generate ids based on arbitrary valued columns, use with_id_from.
Values assigned must be row-wise unique.
- Parameters
new_id – column to be used as the new index. - Return type
Table
- Returns
Table with updated ids.
Example:
import pytest; pytest.xfail("with_id is hard to test") import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | new_id 1 | 2 2 | 3 3 | 4 ''') t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id) pw.debug.compute_and_print(t3)
age owner pet^2 10 Alice 1^3 9 Bob 1^4 8 Alice 2
with_id_from(*args)
Compute new ids based on values in columns. Ids computed from columns must be row-wise unique.
- Parameters
columns – columns to be used as primary keys. - Returns
self updated with recomputed ids. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = t1 + t1.select(old_id=t1.id) t3 = t2.with_id_from(t2.age) pw.debug.compute_and_print(t3)
| age | owner | pet | old_id^... | 8 | Alice | 2 | ^...^... | 9 | Bob | 1 | ^...^... | 10 | Alice | 1 | ^...
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id), same_as_new=(t3.id == t3.pointer_from(t3.age))) pw.debug.compute_and_print(t4)
| age | owner | pet | same_as_old | same_as_new^... | 8 | Alice | 2 | False | True^... | 9 | Bob | 1 | False | True^... | 10 | Alice | 1 | False | True
with_universe_of(other)
Returns a copy of self with exactly the same universe as others.
Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''').with_universe_of(t1) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
Table
without(*columns)
Selects all columns without named column references.
- Parameters
columns (ColumnReference
) – columns to be dropped provided by table.column_name notation. - Returns
self without specified columns. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = t1.without(t1.age, pw.this.pet) pw.debug.compute_and_print(t2, include_id=False)
ownerAliceAliceBob
class pathway.TableLike(universe)
Interface class for table-likes: Table, GroupedTable and JoinResult. All of those contain universe info, and thus support universe-related asserts.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | dog 9 | Bob | dog 8 | Alice | cat 7 | Bob | dog ''') g1 = t1.groupby(t1.owner) t2 = t1.filter(t1.age >= 9) pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet9 | Bob | dog10 | Alice | dog
g2 = t2.groupby(t2.owner) pw.universes.promise_is_subset_of(g2, g1) # t2 is a subset of t1, so this is safe
promise_universe_is_equal_to(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the others.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: None
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | pet 1 | Dog 7 | Cat ''') t2 = pw.debug.parse_to_table(''' | age 1 | 10 7 | 3 ''') t1 = t1.promise_universe_is_equal_to(t2) t3 = t1 + t2 pw.debug.compute_and_print(t3, include_id=False)
pet | ageCat | 3Dog | 10
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universe_is_subset_of(other)
Asserts to Pathway that an universe of self is a subset of universe of each of the other.
Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.
Returns: self
Note: The assertion works in place.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 30 ''').promise_universe_is_subset_of(t1) t3 = t1 << t2 pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 30
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
promise_universes_are_disjoint(other)
Asserts to Pathway that an universe of self is disjoint from universe of other.
Semantics: Used in situations where Pathway cannot deduce universes are disjoint.
Returns: self
Note: The assertion works in place.
import pathway as pw t1 = pw.debug.parse_to_table(''' | age | owner | pet 1 | 10 | Alice | 1 2 | 9 | Bob | 1 3 | 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' | age | owner | pet 11 | 11 | Alice | 30 12 | 12 | Tom | 40 ''').promise_universes_are_disjoint(t1) t3 = t1.concat(t2) pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet8 | Alice | 29 | Bob | 110 | Alice | 111 | Alice | 3012 | Tom | 40
- Return type
TypeVar
(SelfTableLike
, bound= TableLike)
class pathway.WindowJoinResult(join_result, left_original, right_original, left_new, right_new)
Result of a window join between tables.
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( ''' | t 1 | 1 2 | 2 3 | 3 4 | 7 5 | 13 ''' ) t2 = pw.debug.table_from_markdown( ''' | t 1 | 2 2 | 5 3 | 6 4 | 7 ''' ) join_result = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2)) isinstance(join_result, pw.WindowJoinResult)
True
pw.debug.compute_and_print( join_result.select(left_t=t1.t, right_t=t2.t), include_id=False )
left_t | right_t | 51 |2 | 23 | 27 | 67 | 713 |
select(*args, **kwargs)
Computes a result of a window join.
ColumnReference
args: Column references.
Any
kwargs: Column expressions with their new assigned names.
- Returns
Created table. - Return type
Table
Example:
import pathway as pw t1 = pw.debug.table_from_markdown( … ‘’’ … | a | t … 1 | 1 | 1 … 2 | 1 | 2 … 3 | 1 | 3 … 4 | 1 | 7 … 5 | 1 | 13 … 6 | 2 | 1 … 7 | 2 | 2 … 8 | 3 | 4 … ‘’’ … ) t2 = pw.debug.table_from_markdown( … ‘’’ … | b | t … 1 | 1 | 2 … 2 | 1 | 5 … 3 | 1 | 6 … 4 | 1 | 7 … 5 | 2 | 2 … 6 | 2 | 3 … 7 | 4 | 3 … ‘’’ … ) t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select( … key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t … ) pw.debug.compute_and_print(t3, include_id=False) key | left_t | right_t 1 | | 5 1 | 1 | 1 | 2 | 2 1 | 3 | 2 1 | 7 | 6 1 | 7 | 7 1 | 13 | 2 | 1 | 2 | 2 | 2 2 | 2 | 3 3 | 4 | 4 | | 3
pathway.apply(fun, *args, **kwargs)
Applies function to column expressions, column-wise. Output column type deduced from type-annotations of a function.
Example:
import pathway as pw def concat(left: str, right: str) -> str: return left+right t1 = pw.debug.parse_to_table(''' age owner pet 10 Alice dog 9 Bob dog 8 Alice cat 7 Bob dog''') t2 = t1.select(col = pw.apply(concat, t1.owner, t1.pet)) pw.debug.compute_and_print(t2, include_id=False)
colAlicecatAlicedogBobdogBobdog
pathway.apply_async(fun, *args, **kwargs)
Applies function asynchronously to column expressions, column-wise. Output column type deduced from type-annotations of a function. Eiterh a regular or async function can be passed.
Example:
import pathway as pw import asyncio async def concat(left: str, right: str) -> str: await asyncio.sleep(0.1) return left+right t1 = pw.debug.parse_to_table(''' age owner pet 10 Alice dog 9 Bob dog 8 Alice cat 7 Bob dog''') t2 = t1.select(col = pw.apply_async(concat, t1.owner, t1.pet)) pw.debug.compute_and_print(t2, include_id=False)
colAlicecatAlicedogBobdogBobdog
pathway.apply_with_type(fun, ret_type, *args, **kwargs)
Applies function to column expressions, column-wise. Output column type is provided explicitly.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age owner pet 1 10 Alice dog 2 9 Bob dog 3 8 Alice cat 4 7 Bob dog''') t2 = t1.select(col = pw.apply_with_type(lambda left, right: left+right, str, t1.owner, t1.pet)) pw.debug.compute_and_print(t2, include_id=False)
colAlicecatAlicedogBobdogBobdog
pathway.attribute(func, **kwargs)
Decorator for creation of attributes.
Example:
import pathway as pw @pw.transformer class simple_transformer: class table(pw.ClassArg): arg = pw.input_attribute() @pw.attribute def attr(self) -> float: return self.arg*2 @pw.output_attribute def ret(self) -> float: return self.attr + 1 t1 = pw.debug.parse_to_table(''' age 10 9 8 7''') t2 = simple_transformer(table=t1.select(arg=t1.age)).table pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret7 | 158 | 179 | 1910 | 21
pathway.cast(target_type, col)
Changes the type of the column to target_type and converts the data of this column
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' val 1 10 2 9 3 8 4 7''') t1.schema.as_dict()
{'val': <class 'int'>}
pw.debug.compute_and_print(t1, include_id=False)
val78910
t2 = t1.select(val = pw.cast(float, t1.val)) t2.schema.as_dict()
{'val': <class 'float'>}
pw.debug.compute_and_print(t2, include_id=False)
val7.08.09.010.0
- Return type
CastExpression
pathway.coalesce(*args)
For arguments list arg_1, arg_2, …, arg_n returns first not-None value.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' colA colB | 10 2 | | 4 | 7''') t2 = t1.select(t1.colA, t1.colB, col=pw.coalesce(t1.colA, t1.colB)) pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col | | | 10 | 102 | | 24 | 7 | 4
pathway.declare_type(target_type, col)
Used to change the type of a column to a particular type. Disclaimer: it only changes type in a schema, it does not affect values stored.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' val 1 10 2 9 3 8 4 7''') t1.schema.as_dict()
{'val': <class 'int'>}
t2 = t1.select(val = pw.declare_type(int | float, t1.val)) t2.schema.as_dict()
{'val': int | float}
- Return type
DeclareTypeExpression
pathway.if_else(if_clause, then_clause, else_clause)
Equivalent to:
if (if_clause):
return (then_clause)
else:
return (else_clause)
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' colA colB 1 | 0 2 | 2 6 | 3''') t2 = t1.select(res = pw.if_else(t1.colB != 0, t1.colA // t1.colB, 0)) pw.debug.compute_and_print(t2, include_id=False)
res012
- Return type
IfElseExpression
pathway.input_attribute(type=<class 'float'>)
Returns new input_attribute. To be used inside class transformers.
Example:
import pathway as pw @pw.transformer class simple_transformer: class table(pw.ClassArg): arg = pw.input_attribute() @pw.output_attribute def ret(self) -> float: return self.arg + 1 t1 = pw.debug.parse_to_table(''' age 10 9 8 7''') t2 = simple_transformer(table=t1.select(arg=t1.age)).table pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret7 | 88 | 99 | 1010 | 11
pathway.input_method(type=<class 'float'>)
Decorator for defining input methods in class transformers.
Example:
import pathway as pw @pw.transformer class first_transformer: class table(pw.ClassArg): a: float = pw.input_attribute() @pw.method def fun(self, arg) -> int: return self.a * arg @pw.transformer class second_transformer: class table(pw.ClassArg): m = pw.input_method(int) @pw.output_attribute def val(self): return self.m(2) t1 = pw.debug.parse_to_table(''' age 10 9 8 7''') t2 = first_transformer(table=t1.select(a=t1.age)).table t2.schema.as_dict()
{'fun': typing.Callable[..., int]}
t3 = second_transformer(table=t2.select(m=t2.fun)).table pw.debug.compute_and_print(t1 + t3, include_id=False)
age | val7 | 148 | 169 | 1810 | 20
pathway.iterate(func, iteration_limit=None, **kwargs)
Iterate function until fixed point. Function has to take only named arguments, Tables, and return a dict of Tables. Initial arguments to function are passed through kwargs.
Example:
import pathway as pw def collatz_transformer(iterated): def collatz_step(x: int) -> int: if x == 1: return 1 elif x % 2 == 0: return x / 2 else: return 3 * x + 1 new_iterated = iterated.select(val=pw.apply(collatz_step, iterated.val)) return dict(iterated=new_iterated) tab = pw.debug.parse_to_table(''' val 1 2 3 4 5 6 7 8''') ret = pw.iterate(collatz_transformer, iterated=tab).iterated pw.debug.compute_and_print(ret, include_id=False)
val11111111
class pathway.iterate_universe(table)
class pathway.left(*args, **kwargs)
Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the left input table. For all other situations, you need pw.this object.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select( age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size ) pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size9 | Bob | L
pathway.make_tuple(*args)
Creates a tuple from the provided expressions.
- Parameters
args (Union
ColumnExpression
,None
,int
,str
,bool
,BasePointer
,Tuple
Any
,...
) – a list of expressions to be put in a tuple - Return type
MakeTupleExpression
- Returns
tuple
NOTE:
- Each cell in the output column will be a tuple containing the corresponding values from the input columns.
- The order of values in each tuple will match the order of the input columns.
- If any of the input columns have missing values, the resulting tuples will contain None for those positions.
Example:
import pathway as pw table = pw.debug.table_from_markdown( ''' A | B | C 1 | 10 | a 2 | 20 | 3 | 30 | c ''' ) table_with_tuple = table.select(res=pw.make_tuple(pw.this.A, pw.this.B, pw.this.C)) pw.debug.compute_and_print(table_with_tuple, include_id=False)
res(1, 10, 'a')(2, 20, None)(3, 30, 'c')
pathway.method(func, **kwargs)
Decorator for creation methods in class transformers.
Example:
import pathway as pw @pw.transformer class simple_transformer: class table(pw.ClassArg): a: float = pw.input_attribute() @pw.output_attribute def b(self) -> float: return self.fun(self.a) @method def fun(self, arg) -> float: return self.a * arg t1 = pw.debug.parse_to_table(''' age 10 9 8 7''') t2 = simple_transformer(table=t1.select(a=t1.age)).table t2.schema.as_dict()
{'b': <class 'float'>, 'fun': typing.Callable[..., float]}
pw.debug.compute_and_print(t1 + t2.select(t2.b), include_id=False)
age | b7 | 498 | 649 | 8110 | 100
pw.debug.compute_and_print(t1 + t2.select(out = t2.fun(t2.b)), include_id=False)
age | out7 | 3438 | 5129 | 72910 | 1000
pathway.numba_apply(fun, numba_signature, *args, **kwargs)
Applies function to column expressions, column-wise. Function has to be numba compilable.
Currently only a few signatures are supported:
- function has to be unary or binary
- arguments and return type has to be either int64 or float64
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' val 1 1 2 3 3 5 4 7''') t2 = t1.select(col = pw.numba_apply(lambda x: x*x-2*x+1, "int64(int64,)", t1.val)) pw.debug.compute_and_print(t2, include_id=False)
col041636
pathway.output_attribute(func, **kwargs)
Decorator for creation of output_attributes.
Example:
import pathway as pw @pw.transformer class simple_transformer: class table(pw.ClassArg): arg = pw.input_attribute() @pw.output_attribute def ret(self) -> float: return self.arg + 1 t1 = pw.debug.parse_to_table(''' age 10 9 8 7''') t2 = simple_transformer(table=t1.select(arg=t1.age)).table pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret7 | 88 | 99 | 1010 | 11
pathway.pandas_transformer(output_schema, output_universe=None)
Decorator that turns python function operating on pandas.DataFrame into pathway transformer.
Input universes are converted into input DataFrame indexes. The resulting index is treated as the output universe, so it must maintain uniqueness and be of integer type.
- Parameters
- output_schema (
Type
Schema
) – Schema of a resulting table. - output_universe (
Union
str
,int
,None
) – Index or name of an argument whose universe will be used - None. (in resulting table. Defaults to) –
- output_schema (
- Returns
Transformer that can be applied on Pathway tables.
pathway.require(val, *deps)
Returns val iff every dep in deps is not-None. Returns None otherwise.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' colA colB | 10 2 | | 4 | 7''') t2 = t1.select(t1.colA, t1.colB, col=pw.require(t1.colA + t1.colB, t1.colA, t1.colB)) pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col | | | 10 |2 | |4 | 7 | 11
class pathway.right(*args, **kwargs)
Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the right input table. For all other situations, you need pw.this object.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = pw.debug.parse_to_table(''' age | owner | pet | size 10 | Alice | 3 | M 9 | Bob | 1 | L 8 | Tom | 1 | XL ''') t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select( age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size ) pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size9 | Bob | L
pathway.schema_from_types(_name=None, **kwargs)
Constructs schema from kwargs: field=type.
Example:
import pathway as pw s = pw.schema_from_types(foo=int, bar=str) s.as_dict()
{'foo': <class 'int'>, 'bar': <class 'str'>}
issubclass(s, pw.Schema)
True
- Return type
Type
Schema
pathway.sql(query, **kwargs)
Run a SQL query on Pathway tables.
- Parameters
- query (
str
) – the SQL query to execute. - kwargs (
Table
) – the association name: table used for the execution of the SQL query. Each name:table pair links a Pathway table to a table name used in the SQL query.
- query (
Example:
import pathway as pw t = pw.debug.table_from_markdown( """ A | B 1 | 2 4 | 3 4 | 7 """ ) ret = pw.sql("SELECT * FROM tab WHERE A<B", tab=t) pw.debug.compute_and_print(ret, include_id=False)
A | B1 | 24 | 7
Supported SQL keywords and operations: SELECT, WHERE, boolean expressions, arithmetic operations, GROUP BY, HAVING, AS (alias), UNION, INTERSECTION, JOIN, and WITH.
Table and column names are case-sensitive.
Specificities of Pathway:
- id is a reserved key word for columns, every Pathway table has a special column id. This column is not captured by * expressions in SQL.
- Order of columns might not be preserved with respect to SELECT query.
- Pathway reducers (pw.count, pw.sum, etc.) aggregate over None values, while SQL aggregation functions (COUNT, SUM, etc.) skip NULL values.
- UNION requires matching column names.
- INTERSECT requires matching column names.
Limited support:
- Subqueries are supported but fragile – they depend on a set of query rewriting routines from the sqlglot library.
- Additionally, using the id column in subqueries is fragile.
- LIKE, ANY, ALL, EXISTS are not supported, or only supported in a very weak state.
Unsupported operations:
- ordering operations: ORDER BY, LIMIT, SELECT TOP
- INSERT INTO (Pathway tables are immutable)
- Pathway does not support anonymous columns: they might work but we do not guarantee their behavior.
- INTERSECT does not support INTERSECT ALL.
- COALESCE, IFNULL are not supported.
- FULL JOIN and NATURAL JOIN are not supported.
- CAST is not supported
- Return type
Table
class pathway.this(*args, **kwargs)
Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For most of the Table methods, it refers to self. For JoinResult, it refers to the left input table.
Example:
import pathway as pw t1 = pw.debug.parse_to_table(''' age | owner | pet 10 | Alice | 1 9 | Bob | 1 8 | Alice | 2 ''') t2 = t1.select(pw.this.owner, pw.this.age) pw.debug.compute_and_print(t2, include_id=False)
owner | ageAlice | 8Alice | 10Bob | 9
pathway.transformer(cls)
Decorator that wraps the outer class when defining class transformers.
Example:
import pathway as pw @pw.transformer class simple_transformer: class table(pw.ClassArg): arg = pw.input_attribute() @pw.output_attribute def ret(self) -> float: return self.arg + 1 t1 = pw.debug.parse_to_table(''' age 10 9 8 7''') t2 = simple_transformer(table=t1.select(arg=t1.age)).table pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret7 | 88 | 99 | 1010 | 11
Subpackages
- pathway.debug package
- pathway.io package
- Subpackages
- pathway.io.csv package
- pathway.io.debezium package
- pathway.io.elasticsearch package
- pathway.io.fs package
- pathway.io.http package
- pathway.io.jsonlines package
- pathway.io.kafka package
- pathway.io.logstash package
- pathway.io.null package
- pathway.io.plaintext package
- pathway.io.postgres package
- pathway.io.python package
- pathway.io.redpanda package
- pathway.io.s3_csv package
- Subpackages
- pathway.stdlib package