pathway package

Pathway main module

Typical use:

import pathway as pw

class pathway.AsofJoinResult(side_data, mode, defaults)

Result of an ASOF join of two tables

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| K | val | t
1 | 0 | 1 | 1
2 | 0 | 2 | 4
3 | 0 | 3 | 5
4 | 0 | 4 | 6
5 | 0 | 5 | 7
6 | 0 | 6 | 11
7 | 0 | 7 | 12
8 | 1 | 8 | 5
9 | 1 | 9 | 7
'''
)
t2 = pw.debug.table_from_markdown(
'''
| K | val | t
21 | 1 | 7 | 2
22 | 1 | 3 | 8
23 | 0 | 0 | 2
24 | 0 | 6 | 3
25 | 0 | 2 | 7
26 | 0 | 3 | 8
27 | 0 | 9 | 9
28 | 0 | 7 | 13
29 | 0 | 4 | 14
'''
)
res = t1.asof_join(
t2,
t1.t,
t2.t,
t1.K == t2.K,
mode=pw.JoinMode.LEFT,
defaults={t2.val: -1},
).select(
pw.this.shard_key,
pw.this.t,
val_left=t1.val,
val_right=t2.val,
sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t | val_left | val_right | sum
0 | 1 | 1 | -1 | 0
0 | 4 | 2 | 6 | 8
0 | 5 | 3 | 6 | 9
0 | 6 | 4 | 6 | 10
0 | 7 | 5 | 6 | 11
0 | 11 | 6 | 9 | 15
0 | 12 | 7 | 9 | 16
1 | 5 | 8 | 7 | 15
1 | 7 | 9 | 7 | 16

class pathway.AsyncTransformer(input_table)

Allows to perform async transformations on a table.

invoke() will be called asynchronously for each row of an input_table.

Output table can be acccesed via result.

Example:

import pathway as pw
import asyncio
class OutputSchema(pw.Schema):
ret: int
class AsyncIncrementTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
async def invoke(self, value) -> Dict[str, Any]:
await asyncio.sleep(0.1)
return {"ret": value + 1 }
input = pw.debug.parse_to_table('''
| value
1 | 42
2 | 44
''')
result = AsyncIncrementTransformer(input_table=input).result
pw.debug.compute_and_print(result, include_id=False)
ret
42
43

close()

Called once at the end. Proper place for cleanup.

  • Return type
    None

abstract async invoke(*args, **kwargs)

Called for every row of input_table. The arguments will correspond to the columns in the input table.

Should return dict of values matching output_schema.

  • Return type
    Dictstr, Any

open()

Called before actual work. Suitable for one time setup.

  • Return type
    None

property result(: Tabl )

Resulting table.


with_options(capacity=None, retry_strategy=None)

Sets async options.

  • Parameters
    • capacity (Optionalint) – maximum number of concurrent operations.
    • retry_strategy (OptionalAsyncRetryStrategy) – defines how failures will be handled.
  • Return type
    AsyncTransformer
  • Returns
    self

class pathway.ClassArg(ref: RowReference, ptr: Pointer)

Base class to inherit from when writing inner classes for class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
class table(pw.ClassArg):
arg = pw.input_attribute()
@pw.output_attribute
def ret(self) -> int:
return self.arg + 1
t1 = pw.debug.parse_to_table('''
age
10
9
8
7
''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7 | 8
8 | 9
9 | 10
10 | 11

pointer_from(*args, optional=False)

Pseudo-random hash of its argument. Produces pointer types. Applied value-wise.

  • Return type
    BasePointer

class pathway.ColumnReference(*, column, table, name)

Reference to the column.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
isinstance(t1.age, pw.ColumnReference)
True
isinstance(t1["owner"], pw.ColumnReference)
True

property name()

Name of the referred column.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
t1.age.name
'age'

property table()

Table where the referred column belongs to.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
t1.age.table is t1
True

class pathway.FilteredJoinResult(join_result, filtering)


filter(filter_expression)

Filters rows, keeping the ones satisfying the predicate.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice 1
2 9 Bob 1
3 8 Alice 2
''')
t2 = pw.debug.parse_to_table('''
age owner pet size
11 10 Alice 3 M
12 9 Bob 1 L
13 8 Tom 1 XL
''')
result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size) # noqa: E501
pw.debug.compute_and_print(result, include_id=False)
age | size
8 | M
9 | L
10 | M
  • Return type
    FilteredJoinResult

groupby(*args, id=None)

Groups join result by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (OptionalColumnReference) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    Groupby object.
  • Return type
    GroupedJoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
cost owner pet
1 100 Alice 1
2 90 Bob 1
3 80 Alice 2
''')
t2 = pw.debug.parse_to_table('''
cost owner pet size
11 100 Alice 3 M
12 90 Bob 1 L
13 80 Tom 1 XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
.reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob | 1

join(other, *on, id=None, how=JoinMode.INNER)

Join self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_inner(other, *on, id=None)

Inner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_left(other, *on, id=None)

Left-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)),
include_id=False)
A | t2_C | S
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_outer(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_right(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
  • Returns
    OuterJoinResult object

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

reduce(*args, **kwargs)

Reduce a join result to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Reduced table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
cost owner pet
1 100 Alice 1
2 90 Bob 1
3 80 Alice 2
''')
t2 = pw.debug.parse_to_table('''
cost owner pet size
11 100 Alice 3 M
12 90 Bob 1 L
13 80 Tom 1 XL
''')
result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
total_pairs
3

select(*args, **kwargs)

Computes result of a join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9 | Bob | L

class pathway.GroupedJoinResult(*, join_result, args, id)


promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

reduce(*args, **kwargs)

Reduces grouped join result to table.

  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
cost owner pet
1 100 Alice 1
2 90 Bob 1
3 80 Alice 2
''')
t2 = pw.debug.parse_to_table('''
cost owner pet size
11 100 Alice 3 M
12 90 Bob 1 L
13 80 Tom 1 XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
.reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob | 1

class pathway.GroupedTable(table, grouping_columns, set_id=False)

Result of a groupby operation on a Table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.groupby(t1.pet, t1.owner)
isinstance(t2, pw.GroupedTable)
True

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

reduce(*args, **kwargs)

Reduces grouped table to a table.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (ColumnExpression) – Column expressions with their new assigned names.
  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob | dog | 16

class pathway.IntervalJoinResult(left_bucketed, right_bucketed, earlier_part_filtered, later_part_filtered, table_substitution, mode)

Result of an interval join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
join_result = t1.interval_join(t2, t1.t, t2.t, -2, 1)
isinstance(join_result, pw.IntervalJoinResult)
True
pw.debug.compute_and_print(
join_result.select(left_t=t1.t, right_t=t2.t), include_id=False
)
left_t | right_t
3 | 1
3 | 4
4 | 4
5 | 4

select(*args, **kwargs)

Computes a result of an interval join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select(
t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3 | 1
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
2 | 2 | 0
2 | 2 | 2
2 | 3 | 2

class pathway.JoinMode(value)

An enumeration.


class pathway.JoinResult(_universe, _context, _left_table, _right_table, _original_left, _original_right, _substitution, _chained_join_desugaring, _joined_on_names, _join_mode)

Result of a join between tables.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice 1
2 9 Bob 1
3 8 Alice 2
''')
t2 = pw.debug.parse_to_table('''
age owner pet size
11 10 Alice 3 M
12 9 Bob 1 L
13 8 Tom 1 XL
''')
joinresult= t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner) # noqa: E501
isinstance(joinresult, pw.JoinResult)
True
pw.debug.compute_and_print(joinresult.select(t1.age, t2.size), include_id=False)
age | size
9 | L

filter(filter_expression)

Filters rows, keeping the ones satisfying the predicate.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice 1
2 9 Bob 1
3 8 Alice 2
''')
t2 = pw.debug.parse_to_table('''
age owner pet size
11 10 Alice 3 M
12 9 Bob 1 L
13 8 Tom 1 XL
''')
result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size) # noqa: E501
pw.debug.compute_and_print(result, include_id=False)
age | size
8 | M
9 | L
10 | M
  • Return type
    FilteredJoinResult

groupby(*args, id=None)

Groups join result by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (OptionalColumnReference) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    Groupby object.
  • Return type
    GroupedJoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
cost owner pet
1 100 Alice 1
2 90 Bob 1
3 80 Alice 2
''')
t2 = pw.debug.parse_to_table('''
cost owner pet size
11 100 Alice 3 M
12 90 Bob 1 L
13 80 Tom 1 XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
.reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob | 1

join(other, *on, id=None, how=JoinMode.INNER)

Join self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_inner(other, *on, id=None)

Inner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_left(other, *on, id=None)

Left-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)),
include_id=False)
A | t2_C | S
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_outer(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_right(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
  • Returns
    OuterJoinResult object

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

reduce(*args, **kwargs)

Reduce a join result to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Reduced table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
cost owner pet
1 100 Alice 1
2 90 Bob 1
3 80 Alice 2
''')
t2 = pw.debug.parse_to_table('''
cost owner pet size
11 100 Alice 3 M
12 90 Bob 1 L
13 80 Tom 1 XL
''')
result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
total_pairs
3

select(*args, **kwargs)

Computes result of a join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9 | Bob | L

class pathway.Joinable(universe)


join(other, *on, id=None, how=JoinMode.INNER)

Join self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_inner(other, *on, id=None)

Inner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_left(other, *on, id=None)

Left-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)),
include_id=False)
A | t2_C | S
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_outer(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_right(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
  • Returns
    OuterJoinResult object

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

class pathway.Pointer()

Pointer to row type.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.select(col=t1.id)
t2.schema['col'] is pw.Pointer
True

class pathway.Schema()

Base class to inherit from when creating schemas. All schemas should be subclasses of this one.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
t1.schema.as_dict()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}
issubclass(t1.schema, pw.Schema)
True
class NewSchema(pw.Schema):
foo: int
SchemaSum = NewSchema | t1.schema
SchemaSum.as_dict()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>, 'foo': <class 'int'>}

class pathway.StatsMonitoringLevel()

Specifies the verbosity of computation graph monitoring mechanism

NONE: No monitoring

IN_OUT: Monitor input and output latency. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.

ALL: Monitor latency for each operator in the execution graph latency. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.


class pathway.Table(columns, universe, pk_columns={}, schema=None, id_column=None)

Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
isinstance(t1, pw.Table)
True

asof_join(other, t_left, t_right, *on, mode, defaults={})

Perform an ASOF join of two tables

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • t_left (ColumnExpression) – time-like column expression to do the join against
    • t_right (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • mode (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (DictColumnReference, Any) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| K | val | t
1 | 0 | 1 | 1
2 | 0 | 2 | 4
3 | 0 | 3 | 5
4 | 0 | 4 | 6
5 | 0 | 5 | 7
6 | 0 | 6 | 11
7 | 0 | 7 | 12
8 | 1 | 8 | 5
9 | 1 | 9 | 7
'''
)
t2 = pw.debug.table_from_markdown(
'''
| K | val | t
21 | 1 | 7 | 2
22 | 1 | 3 | 8
23 | 0 | 0 | 2
24 | 0 | 6 | 3
25 | 0 | 2 | 7
26 | 0 | 3 | 8
27 | 0 | 9 | 9
28 | 0 | 7 | 13
29 | 0 | 4 | 14
'''
)
res = t1.asof_join(
t2,
t1.t,
t2.t,
t1.K == t2.K,
mode=pw.JoinMode.LEFT,
defaults={t2.val: -1},
).select(
pw.this.shard_key,
pw.this.t,
val_left=t1.val,
val_right=t2.val,
sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
shard_key | t | val_left | val_right | sum
0 | 1 | 1 | -1 | 0
0 | 4 | 2 | 6 | 8
0 | 5 | 3 | 6 | 9
0 | 6 | 4 | 6 | 10
0 | 7 | 5 | 6 | 11
0 | 11 | 6 | 9 | 15
0 | 12 | 7 | 9 | 16
1 | 5 | 8 | 7 | 15
1 | 7 | 9 | 7 | 16

concat(*others)

Concats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    The concatenated table. Id’s of rows from original tables are preserved.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40

concat_reindex(*tables)

Concatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    The concatenated table. It will have new, synthetic ids.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
pet
Cat
Dog
Manul
Octopus

copy()

Returns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
7 | Bob | dog
8 | Alice | cat
9 | Bob | dog
10 | Alice | dog
t1 is t2
False
  • Return type
    Table

difference(other)

Restrict self universe to keys not appearing in the other table.

  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    table with restricted universe, with the same set of columns
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
10 | Alice | 1

dtypes()

Return the types of the columns as a dictionary.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.dtypes()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}
t1.schema['age']
<class 'int'>

static empty(**kwargs)

Creates an empty table with a schema specified by kwargs.

  • Parameters
    kwargs (NewType(DType, type)) – Dict whose keys are column names and values are column types.
  • Returns
    Created empty table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
age | pet

filter(filter_expression)

Filter a table according to filter condition.

  • Parameters
    filter – ColumnExpression that specifies the filtering condition.
  • Returns
    Result has the same schema as self and its ids are subset of self.id.
  • Return type
    Table

Example:

import pathway as pw
vertices = pw.debug.parse_to_table('''
label outdegree
1 3
7 0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
label | outdegree
7 | 0

flatten(*args, **kwargs)

Performs a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet | age
1 | Dog | 2
7 | Cat | 5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
pet
C
D
a
g
o
t
t3 = t1.flatten(t1.pet, t1.age)
pw.debug.compute_and_print(t3, include_id=False)
pet | age
C | 5
D | 2
a | 5
g | 2
o | 2
t | 5
  • Return type
    Table

static from_columns(*args, **kwargs)

Build a table from columns.

All columns must have the same ids. Columns’ names must be pairwise distinct.

  • Parameters
    • args (ColumnReference) – List of columns.
    • kwargs (ColumnReference) – Columns with their new names.
  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)
pet | qux

groupby(*args, id=None)

Groups table by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (OptionalColumnReference) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    Groupby object.
  • Return type
    GroupedTable

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob | dog | 16

having(*indexers)

Removes rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexers

  • Return type
    Table

property id(: ColumnReferenc )

Get reference to pseudocolumn containing id’s of a table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.select(ids = t1.id)
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
test
True
True
True
True
  • Return type
    ColumnReference

interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)

Interpolates missing values in a column using the previous and next values based on a timestamps column.

  • Parameters
    • timestamp (ColumnReference) – Reference to the column containing timestamps.
    • *values (ColumnReference) – References to the columns containing values to be interpolated.
    • mode (InterpolateMode*, *optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
  • Returns
    A new table with the interpolated values.
  • Return type
    Table
  • Raises
    ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.

NOTE:

  • The interpolation is performed based on linear interpolation between the previous and next values.
  • If a value is missing at the beginning or end of the column, no interpolation is performed.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1 | 1 | 10
2 | |
3 | 3 |
4 | |
5 | |
6 | 6 | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values_a | values_b
1 | 1 | 10
2 | 2.0 | 20.0
3 | 3 | 30.0
4 | 4.0 | 40.0
5 | 5.0 | 50.0
6 | 6 | 60

intersect(*tables)

Restrict self universe to keys appearing in all of the tables.

  • Parameters
    tables (Table) – tables keys of which are used to restrict universe.
  • Returns
    table with restricted universe, with the same set of columns
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1

interval_join(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)

Performs an interval join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time_expression (ColumnExpression) – time expression in self.
    • other_time_expression (ColumnExpression) – time expression in other.
    • lower_bound (int | float | timedelta) – a lower bound on time difference between other_time_expression and self_time_expression.
    • upper_bound (int | float | timedelta) – an upper bound on time difference between other_time_expression and self_time_expression.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    IntervalJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, -2, 1).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3 | 1
3 | 4
4 | 4
5 | 4
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select(
t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3 | 1
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
2 | 2 | 0
2 | 2 | 2
2 | 3 | 2

interval_join_left(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)

Performs an interval left join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time_expression (ColumnExpression) – time expression in self.
    • other_time_expression (ColumnExpression) – time expression in other.
    • lower_bound (int | float | timedelta) – a lower bound on time difference between other_time_expression and self_time_expression.
    • upper_bound (int | float | timedelta) – an upper bound on time difference between other_time_expression and self_time_expression.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    IntervalJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, -2, 1).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3 | 1
3 | 4
4 | 4
5 | 4
11 |
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select(
t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3 | 1
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
1 | 11 |
2 | 2 | 0
2 | 2 | 2
2 | 3 | 2
3 | 4 |

interval_join_outer(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)

Performs an interval outer join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time_expression (ColumnExpression) – time expression in self.
    • other_time_expression (ColumnExpression) – time expression in other.
    • lower_bound (int | float | timedelta) – a lower bound on time difference between other_time_expression and self_time_expression.
    • upper_bound (int | float | timedelta) – an upper bound on time difference between other_time_expression and self_time_expression.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    IntervalJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, -2, 1).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
| 0
| 7
3 | 1
3 | 4
4 | 4
5 | 4
11 |
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select(
t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
| | 0
| | 2
| | 7
1 | 3 | 1
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
1 | 11 |
2 | 2 | 0
2 | 2 | 2
2 | 3 | 2
3 | 4 |

interval_join_right(other, self_time_expression, other_time_expression, lower_bound, upper_bound, *on)

Performs an interval right join of self with other using a time difference and join expressions. If self_time_expression + lower_bound <= other_time_expression <= self_time_expression + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time_expression (ColumnExpression) – time expression in self.
    • other_time_expression (ColumnExpression) – time expression in other.
    • lower_bound (int | float | timedelta) – a lower bound on time difference between other_time_expression and self_time_expression.
    • upper_bound (int | float | timedelta) – an upper bound on time difference between other_time_expression and self_time_expression.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    IntervalJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 3
2 | 4
3 | 5
4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 0
2 | 1
3 | 4
4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, -2, 1).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
| 0
| 7
3 | 1
3 | 4
4 | 4
5 | 4
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 3
2 | 1 | 4
3 | 1 | 5
4 | 1 | 11
5 | 2 | 2
6 | 2 | 3
7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 0
2 | 1 | 1
3 | 1 | 4
4 | 1 | 7
5 | 2 | 0
6 | 2 | 2
7 | 4 | 2
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, -2, 1, t1.a == t2.b).select(
t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
| | 0
| | 2
| | 7
1 | 3 | 1
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
2 | 2 | 0
2 | 2 | 2
2 | 3 | 2

property ix()

Return an object that indexed by column returns new table reindexed.

  • Returns
    an object that when indexed by some column returns a table with rows specified by that column.
  • Return type
    Indexer

Example:

import pathway as pw
t_animals = pw.debug.parse_to_table('''
| epithet | genus
1 | upupa | epops
2 | acherontia | atropos
3 | bubo | scandiacus
4 | dynastes | hercules
''')
t_birds = pw.debug.parse_to_table('''
| desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)
desc | latin
hoopoe | atropos
owl | hercules

ix_ref(*args, optional=False)

Returns a row, indexed by its primary keys. Several columns can be used as index.

  • Parameters
    args (ColumnExpression) – Column references.
  • Returns
    indexed row.
  • Return type
    Row

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)
name | pet | new_value
Alice | dog | dog
Bob | cat | dog
Carole | cat | dog
David | dog | dog

Tables obtained by a groupby/reduce scheme always have primary keys:

import pathway as pw
t1 = pw.debug.parse_to_table('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count(pw.this.name))
t3 = t1.select(*pw.this, new_value=t2.ix_ref(pw.this.pet).count)
pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_value
Alice | dog | 1
Bob | cat | 3
Carole | cat | 3
David | cat | 3

Single-row tables can be accessed via ix_ref():

import pathway as pw
t1 = pw.debug.parse_to_table('''
name | pet
Alice | dog
Bob | cat
Carole | cat
David | cat
''')
t2 = t1.reduce(count=pw.reducers.count(pw.this.name))
t3 = t1.select(*pw.this, new_value=t2.ix_ref().count)
pw.debug.compute_and_print(t3, include_id=False)
name | pet | new_value
Alice | dog | 4
Bob | cat | 4
Carole | cat | 4
David | cat | 4

join(other, *on, id=None, how=JoinMode.INNER)

Join self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_inner(other, *on, id=None)

Inner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (OptionalColumnReference) – optional argument for id of result, can be only self.id or other.id
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER).select(age=t1.age, owner_name=t2.owner, size=t2.size) # noqa: E501
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9 | Bob | L

join_left(other, *on, id=None)

Left-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t2.id)),
include_id=False)
A | t2_C | S
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_outer(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(t1.B + t2.D,t1.id,t2.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
13 | |
13 | |

join_right(other, *on, id=None)

Outer-joins two tables or join results.

  • Parameters
    • other (Joinable) – Table or join result.
    • *on – Columns to join, syntax self.col1 == other.col2
    • id (OptionalColumnReference) – optional id column of the result

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    an object on which .select() may be called to extract relevant columns from the result of the join.
  • Return type
    JoinResult

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| A | B
1 | 11 | 111
2 | 12 | 112
3 | 13 | 113
4 | 13 | 114
'''
)
t2 = pw.debug.table_from_markdown(
'''
| C | D
1 | 11 | 211
2 | 12 | 212
3 | 14 | 213
4 | 14 | 214
'''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.A == t2.C
).select(t1.A, t2_C=t2.C, S=pw.require(pw.coalesce(t1.B,0) + t2.D,t1.id)),
include_id=False)
A | t2_C | S
| 14 |
| 14 |
11 | 11 | 322
12 | 12 | 324
  • Returns
    OuterJoinResult object

pointer_from(*args, optional=False)

Pseudo-random hash of its argument. Produces pointer types. Applied column-wise.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
test
True
True

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

reduce(*args, **kwargs)

Reduce a table to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Reduced table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
ageagg
^...
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)
age | pet
7 | dog

rename_columns(**kwargs)

Rename columns according to kwargs.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    kwargs (ColumnReference) – mapping from old column names to new names.
  • Returns
    self with columns renamed.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8 | 2
Alice | 10 | 1
Bob | 9 | 1

property schema(: Type[Schema )

Get schema of the table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
t1.schema.as_dict()
{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}
t1.schema['age']
<class 'int'>
  • Return type
    TypeSchema

select(*args, **kwargs)

Build a new table with columns specified by kwargs.

Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)
animal | desc
Cat | fluffy
Dog | fluffy

sort(key=None, instance=None)

Sorts the table by the specified keys.

  • Parameters
    • key (OptionalColumnReference) – ColumnReference or None The name of the primary key to sort by. If None, the table is sorted based on the key column as primary key.
    • instance (OptionalColumnReference) – ColumnReference or None The name of the secondary key to sort by. If None, the field “instance” is chosen if it exists, otherwise only the primary key is used.
  • Returns
    The sorted table. Contains two columns: prev and next, containing the pointers to the previous and next rows.
  • Return type
    pw.Table

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
name | age | score
Alice | 25 | 80
Bob | 20 | 90
Charlie | 30 | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age)
pw.debug.compute_and_print(table)
| name | age | score | next | prev
^GBSDEEW... | Alice | 25 | 80 | ^DS9AT95... | ^EDPSSB1...
^EDPSSB1... | Bob | 20 | 90 | ^GBSDEEW... |
^DS9AT95... | Charlie | 30 | 80 | | ^GBSDEEW...

update_cells(other)

Updates cells of self, breaking ties in favor of the values in other.

Semantics:

* result.columns == self.columns * result.id == self.id * conflicts are resolved preferring other’s values

Requires:

* other.columns ⊆ self.columns * other.id ⊆ self.id
  • Parameters
    other (Table) – the other table.
  • Returns
    self updated with cells form other.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet
1 | 10 | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30

update_columns(*args, **kwargs)

Updates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| owner | pet | size
1 | Tom | 1 | 10
2 | Bob | 1 | 9
3 | Tom | 2 | 8
''').with_universe_of(t1)
t3 = t1.update_columns(*t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet | size
8 | Tom | 2 | 8
9 | Bob | 1 | 9
10 | Tom | 1 | 10
  • Return type
    Table

update_rows(other)

Updates rows of self, breaking ties in favor for the rows in other.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

Requires:

  • other.columns == self.columns
  • Parameters
    other (Table) – the other table.
  • Returns
    self updated with rows form other.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
12 | 12 | Tom | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
12 | Tom | 40

update_types(**kwargs)

Updates types in schema. Has no effect on the runtime.

  • Return type
    Table

window_join(other, left_time_expression, right_time_expression, window, *on)

Performs a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time_expression – time expression in self.
    • other_time_expression – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    WindowJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.window.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2 | 2
3 | 2
7 | 6
7 | 7
t4 = t1.window_join(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1 | 2
2 | 2
2 | 2
3 | 2
7 | 6
7 | 7
7 | 7
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select(
key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | 2 | 2
1 | 3 | 2
1 | 7 | 6
1 | 7 | 7
2 | 2 | 2
2 | 2 | 3
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0 | 2
0 | 3
0 | 6
5 | 2
5 | 3
5 | 6
15 | 16
17 | 16
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | 1 | -1
1 | 4 | 6
1 | 7 | 6
2 | 0 | 2
2 | 3 | 2
2 | 4 | 2

window_join_left(other, left_time_expression, right_time_expression, window, *on)

Performs a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time_expression – time expression in self.
    • other_time_expression – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    WindowJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.window.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
1 |
2 | 2
3 | 2
7 | 6
7 | 7
13 |
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1 |
1 | 2
2 | 2
2 | 2
3 |
3 | 2
7 | 6
7 | 7
7 | 7
13 |
13 |
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select(
key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | 1 |
1 | 2 | 2
1 | 3 | 2
1 | 7 | 6
1 | 7 | 7
1 | 13 |
2 | 1 |
2 | 2 | 2
2 | 2 | 3
3 | 4 |
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_left(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0 | 2
0 | 3
0 | 6
5 | 2
5 | 3
5 | 6
10 |
15 | 16
17 | 16
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_left(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | 1 | -1
1 | 4 | 6
1 | 7 | 6
2 | 0 | 2
2 | 3 | 2
2 | 4 | 2
2 | 7 |
3 | 4 |

window_join_outer(other, left_time_expression, right_time_expression, window, *on)

Performs a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time_expression – time expression in self.
    • other_time_expression – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    WindowJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
| 5
1 |
2 | 2
3 | 2
7 | 6
7 | 7
13 |
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
| 5
| 5
| 6
1 |
1 | 2
2 | 2
2 | 2
3 |
3 | 2
7 | 6
7 | 7
7 | 7
13 |
13 |
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select(
key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | | 5
1 | 1 |
1 | 2 | 2
1 | 3 | 2
1 | 7 | 6
1 | 7 | 7
1 | 13 |
2 | 1 |
2 | 2 | 2
2 | 2 | 3
3 | 4 |
4 | | 3
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_outer(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
| -3
0 | 2
0 | 3
0 | 6
5 | 2
5 | 3
5 | 6
10 |
15 | 16
17 | 16
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_outer(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | 1 | -1
1 | 4 | 6
1 | 7 | 6
2 | | 10
2 | 0 | 2
2 | 3 | 2
2 | 4 | 2
2 | 7 |
3 | 4 |
4 | | 3

window_join_right(other, left_time_expression, right_time_expression, window, *on)

Performs a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time_expression – time expression in self.
    • other_time_expression – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnBinaryOpExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
  • Returns
    a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.
  • Return type
    WindowJoinResult

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.window.tumbling(2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
| 5
2 | 2
3 | 2
7 | 6
7 | 7
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.window.sliding(1, 2)).select(
left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
| 5
| 5
| 6
1 | 2
2 | 2
2 | 2
3 | 2
7 | 6
7 | 7
7 | 7
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 2
3 | 1 | 3
4 | 1 | 7
5 | 1 | 13
6 | 2 | 1
7 | 2 | 2
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | 2
2 | 1 | 5
3 | 1 | 6
4 | 1 | 7
5 | 2 | 2
6 | 2 | 3
7 | 4 | 3
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select(
key=t2.b, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | | 5
1 | 2 | 2
1 | 3 | 2
1 | 7 | 6
1 | 7 | 7
2 | 2 | 2
2 | 2 | 3
4 | | 3
t1 = pw.debug.table_from_markdown(
'''
| t
0 | 0
1 | 5
2 | 10
3 | 15
4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
0 | -3
1 | 2
2 | 3
3 | 6
4 | 16
'''
)
t3 = t1.window_join_right(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
| -3
0 | 2
0 | 3
0 | 6
5 | 2
5 | 3
5 | 6
15 | 16
17 | 16
t1 = pw.debug.table_from_markdown(
'''
| a | t
1 | 1 | 1
2 | 1 | 4
3 | 1 | 7
4 | 2 | 0
5 | 2 | 3
6 | 2 | 4
7 | 2 | 7
8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
'''
| b | t
1 | 1 | -1
2 | 1 | 6
3 | 2 | 2
4 | 2 | 10
5 | 4 | 3
'''
)
t3 = t1.window_join_right(
t2, t1.t, t2.t, pw.window.session(lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t2.b, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1 | 1 | -1
1 | 4 | 6
1 | 7 | 6
2 | | 10
2 | 0 | 2
2 | 3 | 2
2 | 4 | 2
4 | | 3

windowby(time_expr, *, window, shard=None)

Create a GroupedTable by windowing the table (based on expr and window), optionally sharded with shard

  • Parameters
    • time_expr (ColumnExpression) – Column expression used for windowing
    • window (Window) – type window to use
    • shard (OptionalColumnExpression) – optional column expression to act as a shard key

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
| shard | t | v
1 | 0 | 1 | 10
2 | 0 | 2 | 1
3 | 0 | 4 | 3
4 | 0 | 8 | 2
5 | 0 | 9 | 4
6 | 0 | 10| 8
7 | 1 | 1 | 9
8 | 1 | 2 | 16
''')
result = t.windowby(
t.t, window=pw.window.session(predicate=lambda a, b: abs(a-b) <= 1), shard=t.shard
).reduce(
shard=pw.reducers.min(pw.this.shard),
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(pw.this.t),
)
pw.debug.compute_and_print(result, include_id=False)
shard | min_t | max_v | count
0 | 1 | 10 | 2
0 | 4 | 3 | 1
0 | 8 | 8 | 3
1 | 1 | 16 | 2
  • Return type
    GroupedTable

with_id(new_index)

Set new ids based on another column containing id-typed values.

To generate ids based on arbitrary valued columns, use with_id_from.

Values assigned must be row-wise unique.

  • Parameters
    new_id – column to be used as the new index.
  • Return type
    Table
  • Returns
    Table with updated ids.

Example:

import pytest; pytest.xfail("with_id is hard to test")
import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| new_id
1 | 2
2 | 3
3 | 4
''')
t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id)
pw.debug.compute_and_print(t3)
age owner pet
^2 10 Alice 1
^3 9 Bob 1
^4 8 Alice 2

with_id_from(*args)

Compute new ids based on values in columns. Ids computed from columns must be row-wise unique.

  • Parameters
    columns – columns to be used as primary keys.
  • Returns
    self updated with recomputed ids.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = t1 + t1.select(old_id=t1.id)
t3 = t2.with_id_from(t2.age)
pw.debug.compute_and_print(t3)
| age | owner | pet | old_id
^... | 8 | Alice | 2 | ^...
^... | 9 | Bob | 1 | ^...
^... | 10 | Alice | 1 | ^...
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id),
same_as_new=(t3.id == t3.pointer_from(t3.age)))
pw.debug.compute_and_print(t4)
| age | owner | pet | same_as_old | same_as_new
^... | 8 | Alice | 2 | False | True
^... | 9 | Bob | 1 | False | True
^... | 10 | Alice | 1 | False | True

with_universe_of(other)

Returns a copy of self with exactly the same universe as others.

Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''').with_universe_of(t1)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    Table

without(*columns)

Selects all columns without named column references.

  • Parameters
    columns (ColumnReference) – columns to be dropped provided by table.column_name notation.
  • Returns
    self without specified columns.
  • Return type
    Table

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.without(t1.age, pw.this.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner
Alice
Alice
Bob

class pathway.TableLike(universe)

Interface class for table-likes: Table, GroupedTable and JoinResult. All of those contain universe info, and thus support universe-related asserts.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
8 | Alice | cat
7 | Bob | dog
''')
g1 = t1.groupby(t1.owner)
t2 = t1.filter(t1.age >= 9)
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
9 | Bob | dog
10 | Alice | dog
g2 = t2.groupby(t2.owner)
pw.universes.promise_is_subset_of(g2, g1) # t2 is a subset of t1, so this is safe

promise_universe_is_equal_to(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: None

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.parse_to_table('''
| age
1 | 10
7 | 3
''')
t1 = t1.promise_universe_is_equal_to(t2)
t3 = t1 + t2
pw.debug.compute_and_print(t3, include_id=False)
pet | age
Cat | 3
Dog | 10
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universe_is_subset_of(other)

Asserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

Returns: self

Note: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 30
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

promise_universes_are_disjoint(other)

Asserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

Returns: self

Note: The assertion works in place.

import pathway as pw
t1 = pw.debug.parse_to_table('''
| age | owner | pet
1 | 10 | Alice | 1
2 | 9 | Bob | 1
3 | 8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
| age | owner | pet
11 | 11 | Alice | 30
12 | 12 | Tom | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8 | Alice | 2
9 | Bob | 1
10 | Alice | 1
11 | Alice | 30
12 | Tom | 40
  • Return type
    TypeVar(SelfTableLike, bound= TableLike)

class pathway.WindowJoinResult(join_result, left_original, right_original, left_new, right_new)

Result of a window join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
'''
| t
1 | 1
2 | 2
3 | 3
4 | 7
5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
'''
| t
1 | 2
2 | 5
3 | 6
4 | 7
'''
)
join_result = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2))
isinstance(join_result, pw.WindowJoinResult)
True
pw.debug.compute_and_print(
join_result.select(left_t=t1.t, right_t=t2.t), include_id=False
)
left_t | right_t
| 5
1 |
2 | 2
3 | 2
7 | 6
7 | 7
13 |

select(*args, **kwargs)

Computes a result of a window join. args: ColumnReference args: Column references. kwargs: Any kwargs: Column expressions with their new assigned names.

  • Returns
    Created table.
  • Return type
    Table

Example:

import pathway as pw t1 = pw.debug.table_from_markdown( … ‘’’ … | a | t … 1 | 1 | 1 … 2 | 1 | 2 … 3 | 1 | 3 … 4 | 1 | 7 … 5 | 1 | 13 … 6 | 2 | 1 … 7 | 2 | 2 … 8 | 3 | 4 … ‘’’ … ) t2 = pw.debug.table_from_markdown( … ‘’’ … | b | t … 1 | 1 | 2 … 2 | 1 | 5 … 3 | 1 | 6 … 4 | 1 | 7 … 5 | 2 | 2 … 6 | 2 | 3 … 7 | 4 | 3 … ‘’’ … ) t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.window.tumbling(2), t1.a == t2.b).select( … key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t … ) pw.debug.compute_and_print(t3, include_id=False) key | left_t | right_t 1 | | 5 1 | 1 | 1 | 2 | 2 1 | 3 | 2 1 | 7 | 6 1 | 7 | 7 1 | 13 | 2 | 1 | 2 | 2 | 2 2 | 2 | 3 3 | 4 | 4 | | 3


pathway.apply(fun, *args, **kwargs)

Applies function to column expressions, column-wise. Output column type deduced from type-annotations of a function.

Example:

import pathway as pw
def concat(left: str, right: str) -> str:
return left+right
t1 = pw.debug.parse_to_table('''
age owner pet
10 Alice dog
9 Bob dog
8 Alice cat
7 Bob dog''')
t2 = t1.select(col = pw.apply(concat, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pathway.apply_async(fun, *args, **kwargs)

Applies function asynchronously to column expressions, column-wise. Output column type deduced from type-annotations of a function. Eiterh a regular or async function can be passed.

Example:

import pathway as pw
import asyncio
async def concat(left: str, right: str) -> str:
await asyncio.sleep(0.1)
return left+right
t1 = pw.debug.parse_to_table('''
age owner pet
10 Alice dog
9 Bob dog
8 Alice cat
7 Bob dog''')
t2 = t1.select(col = pw.apply_async(concat, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pathway.apply_with_type(fun, ret_type, *args, **kwargs)

Applies function to column expressions, column-wise. Output column type is provided explicitly.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age owner pet
1 10 Alice dog
2 9 Bob dog
3 8 Alice cat
4 7 Bob dog''')
t2 = t1.select(col = pw.apply_with_type(lambda left, right: left+right, str, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pathway.attribute(func, **kwargs)

Decorator for creation of attributes.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
class table(pw.ClassArg):
arg = pw.input_attribute()
@pw.attribute
def attr(self) -> float:
return self.arg*2
@pw.output_attribute
def ret(self) -> float:
return self.attr + 1
t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7 | 15
8 | 17
9 | 19
10 | 21

pathway.cast(target_type, col)

Changes the type of the column to target_type and converts the data of this column

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
val
1 10
2 9
3 8
4 7''')
t1.schema.as_dict()
{'val': <class 'int'>}
pw.debug.compute_and_print(t1, include_id=False)
val
7
8
9
10
t2 = t1.select(val = pw.cast(float, t1.val))
t2.schema.as_dict()
{'val': <class 'float'>}
pw.debug.compute_and_print(t2, include_id=False)
val
7.0
8.0
9.0
10.0
  • Return type
    CastExpression

pathway.coalesce(*args)

For arguments list arg_1, arg_2, …, arg_n returns first not-None value.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA colB
| 10
2 |
|
4 | 7''')
t2 = t1.select(t1.colA, t1.colB, col=pw.coalesce(t1.colA, t1.colB))
pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col
| |
| 10 | 10
2 | | 2
4 | 7 | 4

pathway.declare_type(target_type, col)

Used to change the type of a column to a particular type. Disclaimer: it only changes type in a schema, it does not affect values stored.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
val
1 10
2 9
3 8
4 7''')
t1.schema.as_dict()
{'val': <class 'int'>}
t2 = t1.select(val = pw.declare_type(int | float, t1.val))
t2.schema.as_dict()
{'val': int | float}
  • Return type
    DeclareTypeExpression

pathway.if_else(if_clause, then_clause, else_clause)

Equivalent to:

if (if_clause): return (then_clause) else: return (else_clause)

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA colB
1 | 0
2 | 2
6 | 3''')
t2 = t1.select(res = pw.if_else(t1.colB != 0, t1.colA // t1.colB, 0))
pw.debug.compute_and_print(t2, include_id=False)
res
0
1
2
  • Return type
    IfElseExpression

pathway.input_attribute(type=<class 'float'>)

Returns new input_attribute. To be used inside class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
class table(pw.ClassArg):
arg = pw.input_attribute()
@pw.output_attribute
def ret(self) -> float:
return self.arg + 1
t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7 | 8
8 | 9
9 | 10
10 | 11

pathway.input_method(type=<class 'float'>)

Decorator for defining input methods in class transformers.

Example:

import pathway as pw
@pw.transformer
class first_transformer:
class table(pw.ClassArg):
a: float = pw.input_attribute()
@pw.method
def fun(self, arg) -> int:
return self.a * arg
@pw.transformer
class second_transformer:
class table(pw.ClassArg):
m = pw.input_method(int)
@pw.output_attribute
def val(self):
return self.m(2)
t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = first_transformer(table=t1.select(a=t1.age)).table
t2.schema.as_dict()
{'fun': typing.Callable[..., int]}
t3 = second_transformer(table=t2.select(m=t2.fun)).table
pw.debug.compute_and_print(t1 + t3, include_id=False)
age | val
7 | 14
8 | 16
9 | 18
10 | 20

pathway.iterate(func, iteration_limit=None, **kwargs)

Iterate function until fixed point. Function has to take only named arguments, Tables, and return a dict of Tables. Initial arguments to function are passed through kwargs.

Example:

import pathway as pw
def collatz_transformer(iterated):
def collatz_step(x: int) -> int:
if x == 1:
return 1
elif x % 2 == 0:
return x / 2
else:
return 3 * x + 1
new_iterated = iterated.select(val=pw.apply(collatz_step, iterated.val))
return dict(iterated=new_iterated)
tab = pw.debug.parse_to_table('''
val
1
2
3
4
5
6
7
8''')
ret = pw.iterate(collatz_transformer, iterated=tab).iterated
pw.debug.compute_and_print(ret, include_id=False)
val
1
1
1
1
1
1
1
1

class pathway.iterate_universe(table)


class pathway.left(*args, **kwargs)

Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the left input table. For all other situations, you need pw.this object.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select(
age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size
)
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9 | Bob | L

pathway.make_tuple(*args)

Creates a tuple from the provided expressions.

  • Parameters
    args (UnionColumnExpression, None, int, str, bool, BasePointer, TupleAny, ...) – a list of expressions to be put in a tuple
  • Return type
    MakeTupleExpression
  • Returns
    tuple

NOTE:

  • Each cell in the output column will be a tuple containing the corresponding values from the input columns.
  • The order of values in each tuple will match the order of the input columns.
  • If any of the input columns have missing values, the resulting tuples will contain None for those positions.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
'''
A | B | C
1 | 10 | a
2 | 20 |
3 | 30 | c
'''
)
table_with_tuple = table.select(res=pw.make_tuple(pw.this.A, pw.this.B, pw.this.C))
pw.debug.compute_and_print(table_with_tuple, include_id=False)
res
(1, 10, 'a')
(2, 20, None)
(3, 30, 'c')

pathway.method(func, **kwargs)

Decorator for creation methods in class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
class table(pw.ClassArg):
a: float = pw.input_attribute()
@pw.output_attribute
def b(self) -> float:
return self.fun(self.a)
@method
def fun(self, arg) -> float:
return self.a * arg
t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(a=t1.age)).table
t2.schema.as_dict()
{'b': <class 'float'>, 'fun': typing.Callable[..., float]}
pw.debug.compute_and_print(t1 + t2.select(t2.b), include_id=False)
age | b
7 | 49
8 | 64
9 | 81
10 | 100
pw.debug.compute_and_print(t1 + t2.select(out = t2.fun(t2.b)), include_id=False)
age | out
7 | 343
8 | 512
9 | 729
10 | 1000

pathway.numba_apply(fun, numba_signature, *args, **kwargs)

Applies function to column expressions, column-wise. Function has to be numba compilable.

Currently only a few signatures are supported:

  • function has to be unary or binary
  • arguments and return type has to be either int64 or float64

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
val
1 1
2 3
3 5
4 7''')
t2 = t1.select(col = pw.numba_apply(lambda x: x*x-2*x+1, "int64(int64,)", t1.val))
pw.debug.compute_and_print(t2, include_id=False)
col
0
4
16
36

pathway.output_attribute(func, **kwargs)

Decorator for creation of output_attributes.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
class table(pw.ClassArg):
arg = pw.input_attribute()
@pw.output_attribute
def ret(self) -> float:
return self.arg + 1
t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7 | 8
8 | 9
9 | 10
10 | 11

pathway.pandas_transformer(output_schema, output_universe=None)

Decorator that turns python function operating on pandas.DataFrame into pathway transformer.

Input universes are converted into input DataFrame indexes. The resulting index is treated as the output universe, so it must maintain uniqueness and be of integer type.

  • Parameters
    • output_schema (TypeSchema) – Schema of a resulting table.
    • output_universe (Unionstr, int, None) – Index or name of an argument whose universe will be used
    • None. (in resulting table. Defaults to) –
  • Returns
    Transformer that can be applied on Pathway tables.

pathway.require(val, *deps)

Returns val iff every dep in deps is not-None. Returns None otherwise.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA colB
| 10
2 |
|
4 | 7''')
t2 = t1.select(t1.colA, t1.colB, col=pw.require(t1.colA + t1.colB, t1.colA, t1.colB))
pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col
| |
| 10 |
2 | |
4 | 7 | 11

class pathway.right(*args, **kwargs)

Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the right input table. For all other situations, you need pw.this object.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = pw.debug.parse_to_table('''
age | owner | pet | size
10 | Alice | 3 | M
9 | Bob | 1 | L
8 | Tom | 1 | XL
''')
t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select(
age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size
)
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9 | Bob | L

pathway.schema_from_types(_name=None, **kwargs)

Constructs schema from kwargs: field=type.

Example:

import pathway as pw
s = pw.schema_from_types(foo=int, bar=str)
s.as_dict()
{'foo': <class 'int'>, 'bar': <class 'str'>}
issubclass(s, pw.Schema)
True
  • Return type
    TypeSchema

pathway.sql(query, **kwargs)

Run a SQL query on Pathway tables.

  • Parameters
    • query (str) – the SQL query to execute.
    • kwargs (Table) – the association name: table used for the execution of the SQL query. Each name:table pair links a Pathway table to a table name used in the SQL query.

Example:

import pathway as pw
t = pw.debug.table_from_markdown(
"""
A | B
1 | 2
4 | 3
4 | 7
"""
)
ret = pw.sql("SELECT * FROM tab WHERE A<B", tab=t)
pw.debug.compute_and_print(ret, include_id=False)
A | B
1 | 2
4 | 7

Supported SQL keywords and operations: SELECT, WHERE, boolean expressions, arithmetic operations, GROUP BY, HAVING, AS (alias), UNION, INTERSECTION, JOIN, and WITH.

Table and column names are case-sensitive.

Specificities of Pathway:

  • id is a reserved key word for columns, every Pathway table has a special column id. This column is not captured by * expressions in SQL.
  • Order of columns might not be preserved with respect to SELECT query.
  • Pathway reducers (pw.count, pw.sum, etc.) aggregate over None values, while SQL aggregation functions (COUNT, SUM, etc.) skip NULL values.
  • UNION requires matching column names.
  • INTERSECT requires matching column names.

Limited support:

  • Subqueries are supported but fragile – they depend on a set of query rewriting routines from the sqlglot library.
  • Additionally, using the id column in subqueries is fragile.
  • LIKE, ANY, ALL, EXISTS are not supported, or only supported in a very weak state.

Unsupported operations:

  • ordering operations: ORDER BY, LIMIT, SELECT TOP
  • INSERT INTO (Pathway tables are immutable)
  • Pathway does not support anonymous columns: they might work but we do not guarantee their behavior.
  • INTERSECT does not support INTERSECT ALL.
  • COALESCE, IFNULL are not supported.
  • FULL JOIN and NATURAL JOIN are not supported.
  • CAST is not supported
  • Return type
    Table

class pathway.this(*args, **kwargs)

Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For most of the Table methods, it refers to self. For JoinResult, it refers to the left input table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10 | Alice | 1
9 | Bob | 1
8 | Alice | 2
''')
t2 = t1.select(pw.this.owner, pw.this.age)
pw.debug.compute_and_print(t2, include_id=False)
owner | age
Alice | 8
Alice | 10
Bob | 9

pathway.transformer(cls)

Decorator that wraps the outer class when defining class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
class table(pw.ClassArg):
arg = pw.input_attribute()
@pw.output_attribute
def ret(self) -> float:
return self.arg + 1
t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7 | 8
8 | 9
9 | 10
10 | 11

Subpackages

Submodules

pathway.cli module

pathway.conftest module