Pathway API

Reference for all the Pathway classes and functions.

See Table API for the main Table class.

class pw.AsyncTransformer(input_table)

[source]
Allows to perform async transformations on a table.

invoke() will be called asynchronously for each row of an input_table.

Output table can be acccesed via result.

Example:

import pathway as pw
import asyncio
class OutputSchema(pw.Schema):
   ret: int

class AsyncIncrementTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
    async def invoke(self, value) -> Dict[str, Any]:
        await asyncio.sleep(0.1)
        return {"ret": value + 1 }

input = pw.debug.table_from_markdown('''
  | value
1 | 42
2 | 44
''')
result = AsyncIncrementTransformer(input_table=input).result
pw.debug.compute_and_print(result, include_id=False)
ret
43
45

close()

sourceCalled once at the end. Proper place for cleanup.

abstract async invoke(*args, **kwargs)

sourceCalled for every row of input_table. The arguments will correspond to the columns in the input table.

Should return dict of values matching output_schema.

open()

sourceCalled before actual work. Suitable for one time setup.

with_options(capacity=None, retry_strategy=None, cache_strategy=None)

sourceSets async options.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations. Defaults to None, indicating no specific limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. If set to None and a persistency is enabled, operations will be cached using the persistence layer. Defaults to None.
  • Returns
    self

property result: Table

Resulting table.

class pw.BaseCustomAccumulator

[source]
Utility class for defining custom accumulators, used for custom reducers. Custom accumulators should inherit from this class, and should implement `from_row`, `update` and `compute_result`. Optionally `neutral` and `retract` can be provided for more efficient processing on streams with changing data.
import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
  def __init__(self, sum, cnt):
    self.sum = sum
    self.cnt = cnt

  @classmethod
  def from_row(self, row):
    [val] = row
    return CustomAvgAccumulator(val, 1)

  def update(self, other):
    self.sum += other.sum
    self.cnt += other.cnt

  def compute_result(self) -> float:
    return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10  | Alice | dog | 100
9   | Bob   | cat | 80
8   | Alice | cat | 90
7   | Bob   | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)
owner | avg_price
Alice | 95.0
Bob   | 75.0

abstract compute_result()

sourceMandatory function to finalize computation. Used to extract answer from final state of accumulator.

Narrowing the type of this function helps better type the output of the reducer.

abstract classmethod from_row(row)

sourceConstruct the accumulator from a row of data. Row will be passed as a list of values.

This is a mandatory function.

classmethod neutral()

sourceNeutral element of the accumulator (aggregation of an empty list).

This function is optional, and allows for more efficient processing on streams with changing data.

retract(other)

sourceUpdate the accumulator by removing the value of another one.

This function is optional, and allows more efficient reductions on streams with changing data.

abstract update(other)

sourceUpdate the accumulator with another one. Method does not need to return anything, the change should be in-place.

This is a mandatory function.

class pw.ClassArg(ref: RowReference, ptr: Pointer)

[source]
Base class to inherit from when writing inner classes for class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()
        @pw.output_attribute
        def ret(self) -> int:
            return self.arg + 1
t1 = pw.debug.table_from_markdown('''
age
10
9
8
7
''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pointer_from(*args, optional=False)

sourcePseudo-random hash of its argument. Produces pointer types. Applied value-wise.

class pw.ColumnExpression

[source]

as_bool()

sourceConverts value to a bool or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": True}, {"value": False}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_bool())
pw.debug.compute_and_print(result, include_id=False)
result
False
True

as_float()

sourceConverts value to a float or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": 1.5}, {"value": 3.14}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_float())
pw.debug.compute_and_print(result, include_id=False)
result
1.5
3.14

as_int()

sourceConverts value to an int or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": 1}, {"value": 2}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_int())
pw.debug.compute_and_print(result, include_id=False)
result
1
2

as_str()

sourceConverts value to a string or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": "dog"}, {"value": "cat"}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_str())
pw.debug.compute_and_print(result, include_id=False)
result
cat
dog

get(index, default=None)

sourceExtracts element at index from an object. The object has to be a Tuple or Json. If no element is present at index, it returns value specified by a default parameter.

Index can be effectively int for Tuple and int or str for Json. For Tuples, using negative index can be used to access elements at the end, moving backwards.

  • Parameters
    • index (ColumnExpression | int | str) – Position to extract element at.
    • default (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – Value returned when no element is at position index. Defaults to None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | a | b | c
1 | 3 | 2 | 2
2 | 4 | 1 | 0
3 | 7 | 3 | 1
'''
)
t2 = t1.with_columns(tup=pw.make_tuple(pw.this.a, pw.this.b))
t3 = t2.select(
    x=pw.this.tup.get(1),
    y=pw.this.tup.get(3),
    z=pw.this.tup.get(pw.this.c),
    t=pw.this.tup.get(pw.this.c, default=100),
)
pw.debug.compute_and_print(t3, include_id=False)
x | y | z | t
1 |   | 4 | 4
2 |   |   | 100
3 |   | 3 | 3

is_none()

sourceReturns true if the value is None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | owner | pet
1 | Alice | dog
2 | Bob   |
3 | Carol | cat
''')
t2 = t1.with_columns(has_no_pet=pw.this.pet.is_none())
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | has_no_pet
Alice | dog | False
Bob   |     | True
Carol | cat | False

is_not_none()

sourceReturns true if the value is not None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | owner | pet
1 | Alice | dog
2 | Bob   |
3 | Carol | cat
''')
t2 = t1.with_columns(has_pet=pw.this.pet.is_not_none())
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | has_pet
Alice | dog | True
Bob   |     | False
Carol | cat | True

to_string()

sourceChanges the values to strings.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
val
1
2
3
4''')
t1.schema
<pathway.Schema types={'val': <class 'int'>}>
pw.debug.compute_and_print(t1, include_id=False)
val
1
2
3
4
t2 = t1.select(val = pw.this.val.to_string())
t2.schema
<pathway.Schema types={'val': <class 'str'>}>
pw.debug.compute_and_print(t2.select(val=pw.this.val + "a"), include_id=False)
val
1a
2a
3a
4a

class pw.ColumnReference()

[source]
Reference to the column.

Inherits from ColumnExpression.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
isinstance(t1.age, pw.ColumnReference)
True
isinstance(t1["owner"], pw.ColumnReference)
True

property name

Name of the referred column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.name
'age'

property table

Table where the referred column belongs to.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.table is t1
True

class pw.DateTimeNaive(ts_input=<object object>, freq=None, tz=None, unit=None, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, nanosecond=None, tzinfo=None, *, fold=None)

[source]
Type for storing datetime without timezone information. Extends pandas.Timestamp type.

class pw.DateTimeUtc(ts_input=<object object>, freq=None, tz=None, unit=None, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, nanosecond=None, tzinfo=None, *, fold=None)

[source]
Type for storing datetime with default timezone. Extends pandas.Timestamp type.

class pw.Duration(value=<object object>, unit=None, **kwargs)

[source]
Type for storing duration of time. Extends pandas.Timedelta type.

class pw.GroupedJoinResult()

[source]

reduce(*args, **kwargs)

sourceReduces grouped join result to table.

  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
    .reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob   | 1

class pw.GroupedTable()

[source]
Result of a groupby operation on a Table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner)
isinstance(t2, pw.GroupedTable)
True

reduce(*args, **kwargs)

sourceReduces grouped table to a table.

  • Parameters
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob   | dog | 16

class pw.JoinMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

[source]
Enum used for controlling type of a join when passed to a generic join function. Consists of values: JoinMode.INNER, JoinMode.LEFT, JoinMode.RIGHT, JoinMode.OUTER
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
inner_join = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(inner_join, include_id = False)
age | owner_name | size
9   | Bob        | L
outer_join = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.OUTER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(outer_join, include_id = False)
age | owner_name | size
    | Alice      | M
    | Tom        | XL
8   |            |
9   | Bob        | L
10  |            |

INNER = 0

Use inner join.

LEFT = 1

Use left join.

OUTER = 3

Use outer join.

RIGHT = 2

Use right join.

class pw.JoinResult()

[source]
Result of a join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice    1
2    9    Bob    1
3    8  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    age  owner  pet size
11   10  Alice    3    M
12    9    Bob    1    L
13    8    Tom    1   XL
''')
joinresult= t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner)   # noqa: E501
isinstance(joinresult, pw.JoinResult)
True
pw.debug.compute_and_print(joinresult.select(t1.age, t2.size), include_id=False)
age | size
9   | L

filter(filter_expression)

sourceFilters rows, keeping the ones satisfying the predicate.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice    1
2    9    Bob    1
3    8  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    age  owner  pet size
11   10  Alice    3    M
12    9    Bob    1    L
13    8    Tom    1   XL
''')
result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size)   # noqa: E501
pw.debug.compute_and_print(result, include_id=False)
age | size
8   | M
9   | L
10  | M

groupby(*args, id=None)

sourceGroups join result by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    GroupedJoinResult – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
    .reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob   | 1

reduce(*args, **kwargs)

sourceReduce a join result to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
total_pairs
3

select(*args, **kwargs)

sourceComputes result of a join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size)   # noqa: E501
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9   | Bob        | L

class pw.Joinable(context)

[source]

join(other, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourceJoin self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourceInner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join_inner(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(
    age=t1.age, owner_name=t2.owner, size=t2.size
)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourceLeft-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)
a  | t2_c | s
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_outer(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • instance – optional argument describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_right(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
  • Returns
    OuterJoinResult object

property C: ColumnNamespace

Returns the namespace of all the columns of a joinable. Allows accessing column names that might otherwise be a reserved methods.

import pathway as pw
tab = pw.debug.table_from_markdown('''
age | owner | pet | filter
10  | Alice | dog | True
9   | Bob   | dog | True
8   | Alice | cat | False
7   | Bob   | dog | True
''')
isinstance(tab.C.age, pw.ColumnReference)
True
pw.debug.compute_and_print(tab.filter(tab.C.filter), include_id=False)
age | owner | pet | filter
7   | Bob   | dog | True
9   | Bob   | dog | True
10  | Alice | dog | True

class pw.Json()

[source]
Represents JSON values.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
a    | b | c
True | 2 | manul
''')
@pw.udf
def to_json(val) -> pw.Json:
    return pw.Json(val)
result = t1.select(**{c: to_json(pw.this[c]) for c in t1.column_names()})
pw.debug.compute_and_print(result, include_id=False)
a    | b | c
true | 2 | "manul"

as_bool()

sourceReturns Json value as a float if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> bool:
    return data["value"].as_bool()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": True},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
True

as_dict()

sourceReturns Json value as a dict if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> tuple:
    return tuple(data["value"].as_dict().values())

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": {"inner": 42}},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
(42,)

as_float()

sourceReturns Json value as a float if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> float:
    return data["value"].as_float()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": 3.14},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
3.14

as_int()

sourceReturns Json value as an int if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> int:
    return data["value"].as_int()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": 42},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
42

as_list()

sourceReturns Json value as a list if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> int:
    return data["value"].as_list()[-1]

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": [1,2,3]},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
3

as_str()

sourceReturns Json value as a string if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> str:
    return data["value"].as_str()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": "foo"},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
foo

class pw.Schema

[source]
Base class to inherit from when creating schemas. All schemas should be subclasses of this one.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.schema
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}>
issubclass(t1.schema, pw.Schema)
True
class NewSchema(pw.Schema):
  foo: int
SchemaSum = NewSchema | t1.schema
SchemaSum
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>, 'foo': <class 'int'>}>

class pw.SchemaProperties(append_only=None)

[source]

class pw.Table()

[source]
Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
isinstance(t1, pw.Table)
True

asof_join(other, self_time, other_time, *on, how, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 1  | 1        | -1        | 0
0        | 4  | 2        | 6         | 8
0        | 5  | 3        | 6         | 9
0        | 6  | 4        | 6         | 10
0        | 7  | 5        | 2         | 7
0        | 11 | 6        | 9         | 15
0        | 12 | 7        | 9         | 16
1        | 5  | 8        | 7         | 15
1        | 7  | 9        | 7         | 16

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join(
    t2, t1.event_time, t2.event_time, how=pw.JoinMode.LEFT
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
4          | 42          | 1         | 1          | 8        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

result_without_cutoff = t1.asof_join(
    t2,
    t1.event_time,
    t2.event_time,
    how=pw.JoinMode.LEFT,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_join_left(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a left ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 1  | 1        | -1        | 0
0        | 4  | 2        | 6         | 8
0        | 5  | 3        | 6         | 9
0        | 6  | 4        | 6         | 10
0        | 7  | 5        | 2         | 7
0        | 11 | 6        | 9         | 15
0        | 12 | 7        | 9         | 16
1        | 5  | 8        | 7         | 15
1        | 7  | 9        | 7         | 16

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join_left(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
4          | 42          | 1         | 1          | 8        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

result_without_cutoff = t1.asof_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_join_outer(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an outer ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 1  | 1        | -1        | 0
0        | 2  | 1        | 0         | 1
0        | 3  | 1        | 6         | 7
0        | 4  | 2        | 6         | 8
0        | 5  | 3        | 6         | 9
0        | 6  | 4        | 6         | 10
0        | 7  | 5        | 2         | 7
0        | 7  | 5        | 6         | 11
0        | 8  | 5        | 3         | 8
0        | 9  | 5        | 9         | 14
0        | 11 | 6        | 9         | 15
0        | 12 | 7        | 9         | 16
0        | 13 | 7        | 7         | 14
0        | 14 | 7        | 4         | 11
1        | 2  | -1       | 7         | 6
1        | 5  | 8        | 7         | 15
1        | 7  | 9        | 7         | 16
1        | 8  | 9        | 3         | 12

asof_join_right(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a right ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 2  | 1        | 0         | 1
0        | 3  | 1        | 6         | 7
0        | 7  | 5        | 2         | 7
0        | 8  | 5        | 3         | 8
0        | 9  | 5        | 9         | 14
0        | 13 | 7        | 7         | 14
0        | 14 | 7        | 4         | 11
1        | 2  | -1       | 7         | 6
1        | 8  | 9        | 3         | 12

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
result_without_cutoff = t1.asof_join_right(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
42         | 2           | 1         | 2          | 4        | 1
42         | 3           | 1         | 5          | 6        | 1
42         | 4           | 1         | 1          | 8        | 1
42         | 3           | 1         | 5          | 10       | -1
8          | 3           | 4         | 5          | 10       | 1
8          | 5           | 4         | 7          | 14       | 1
result_without_cutoff = t1.asof_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
42         | 2           | 1         | 2          | 4        | 1
42         | 3           | 1         | 5          | 6        | 1
42         | 3           | 1         | 5          | 10       | -1
8          | 3           | 4         | 5          | 10       | 1
8          | 5           | 4         | 7          | 14       | 1

The record with value=4 from table t2 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_now_join(other, *on, how=JoinMode.INNER, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT} which correspond to inner and left join respectively.
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join(
    data, pw.left.instance == pw.right.instance, how=pw.JoinMode.LEFT
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
1     |     | 2        | 1
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

asof_now_join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_inner(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

asof_now_join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. If there are no matching rows in other, missing values on the right side are replaced with None. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_left(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
1     |     | 2        | 1
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

cast_to_types(**kwargs)

sourceCasts columns to types.

concat(*others)

sourceConcats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    Table – The concatenated table. Id’s of rows from original tables are preserved.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

concat_reindex(*tables)

sourceConcatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    Table – The concatenated table. It will have new, synthetic ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
pet
Cat
Dog
Manul
Octopus

copy()

sourceReturns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
7   | Bob   | dog
8   | Alice | cat
9   | Bob   | dog
10  | Alice | dog
t1 is t2
False

deduplicate(*, value, instance=None, acceptor, persistent_id=None)

sourceDeduplicates rows in self on value column using acceptor function.

It keeps rows which where accepted by the acceptor function. Acceptor operates on two arguments - current value and the previously accepted value.

  • Parameters
    • value (ColumnExpression) – column expression used for deduplication.
    • instance (ColumnExpression | None) – Grouping column. For rows with different values in this column, deduplication will be performed separately. Defaults to None.
    • acceptor (Callable[[TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]), TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]])], bool]) – callback telling whether two values are different.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – the result of deduplication.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
    val | __time__
     1  |     2
     2  |     4
     3  |     6
     4  |     8
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(value=pw.this.val, acceptor=acceptor)
pw.debug.compute_and_print_update_stream(result, include_id=False)
val | __time__ | __diff__
1   | 2        | 1
1   | 6        | -1
3   | 6        | 1

table = pw.debug.table_from_markdown(
    '''
    val | instance | __time__
     1  |     1    |     2
     2  |     1    |     4
     3  |     2    |     6
     4  |     1    |     8
     4  |     2    |     8
     5  |     1    |    10
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(
    value=pw.this.val, instance=pw.this.instance, acceptor=acceptor
)
pw.debug.compute_and_print_update_stream(result, include_id=False)
val | instance | __time__ | __diff__
1   | 1        | 2        | 1
3   | 2        | 6        | 1
1   | 1        | 8        | -1
4   | 1        | 8        | 1

diff(timestamp, *values)

sourceCompute the difference between the values in the values columns and the previous values according to the order defined by the column timestamp.

  • Parameters
    • timestamp (-) – The column reference to the timestamp column on which the order is computed.
    • *values (-) – Variable-length argument representing the column references to the values columns.
  • Returns
    Table – A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.
  • Raises
    ValueError – If the columns are not ColumnReference.

NOTE: * The value of the “first” value (the row with the lower value in the timestamp column) is None.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1         | 1
2         | 2
3         | 4
4         | 7
5         | 11
6         | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values | diff_values
1         | 1      |
2         | 2      | 1
3         | 4      | 2
4         | 7      | 3
5         | 11     | 4
6         | 16     | 5

difference(other)

sourceRestrict self universe to keys not appearing in the other table.

  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
10  | Alice | 1

empty()

sourceCreates an empty table with a schema specified by kwargs.

  • Parameters
    kwargs (DType) – Dict whose keys are column names and values are column types.
  • Returns
    Table – Created empty table.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
age | pet

filter(filter_expression)

sourceFilter a table according to filter_expression condition.

  • Parameters
    filter_expression (ColumnExpression) – ColumnExpression that specifies the filtering condition.
  • Returns
    Table – Result has the same schema as self and its ids are subset of self.id.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
label | outdegree
7     | 0

flatten(*args, **kwargs)

sourcePerforms a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable or Json array. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet  |  age
1 | Dog  |   2
7 | Cat  |   5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
pet
C
D
a
g
o
t
t3 = t1.flatten(t1.pet, t1.age)
pw.debug.compute_and_print(t3, include_id=False)
pet | age
C   | 5
D   | 2
a   | 5
g   | 2
o   | 2
t   | 5

from_columns(**kwargs)

sourceBuild a table from columns.

All columns must have the same ids