Pathway API

Reference for all the Pathway classes and functions.

See Table API for the main Table class.

class pw.AsyncTransformer(input_table, *, instance=pw.this.id, autocommit_duration_ms=1500)

[source]

Allows to perform async transformations on a table.

invoke() will be called asynchronously for each row of an input_table.

Output table can be acccesed via result.

Example:

import pathway as pw
import asyncio
class OutputSchema(pw.Schema):
   ret: int

class AsyncIncrementTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
    async def invoke(self, value) -> Dict[str, Any]:
        await asyncio.sleep(0.1)
        return {"ret": value + 1 }

input = pw.debug.table_from_markdown('''
  | value
1 | 42
2 | 44
''')
result = AsyncIncrementTransformer(input_table=input).result
pw.debug.compute_and_print(result, include_id=False)

close()

sourceCalled once at the end. Proper place for cleanup.

abstract async invoke(*args, **kwargs)

sourceCalled for every row of input_table. The arguments will correspond to the columns in the input table.

Should return dict of values matching output_schema.

open()

sourceCalled before actual work. Suitable for one time setup.

with_options(capacity=None, timeout=None, retry_strategy=None, cache_strategy=None)

sourceSets async options.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations. Defaults to None, indicating no specific limit.
    • timeout (float | None) – Maximum time (in seconds) to wait for the function result. Defaults to None, indicating no time limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. If set to None and a persistency is enabled, operations will be cached using the persistence layer. Defaults to None.
  • Returns
    self

property failed: Table

The resulting table containing only rows that failed during execution. If the instance argument is specified, it also contains rows that were executed successfully but at least one element from their instance with less or equal time failed.

property finished: Table

The resulting table containing all rows that finished their execution. The column _async_status contains the state of the row. The rows that finished successfully, have their status set to “-SUCCESS-”. The rows that failed, have their status set to “-FAILURE-“. If the instance argument is specified, rows that were executed successfully but at least one element from their instance with less or equal time failed, have their status set as “-FAILURE-“.

If you want to get only rows that executed successfully, use successful property instead.

property output_table: Table

The resulting table containing all rows that started their execution. The column _async_status contains the state of the row. The rows that finished successfully, have their status set to “-SUCCESS-”. The rows that failed, have their status set to “-FAILURE-”. The rows that are still being executed, have their state set to “-PENDING-“.

It is recommended to use this property for debugging/presentational purposes only. For other purposes, successful property should be preferred. It returns a Table containing only rows that were executed successfully.

property result: Table

The resulting table containing only rows that were executed successfully.

Deprecated. Use successful instead.

property successful: Table

The resulting table containing only rows that were executed successfully.

class pw.BaseCustomAccumulator

[source]

Utility class for defining custom accumulators, used for stateful reducers. Custom accumulators should inherit from this class, and should implement from_row, update and compute_result. Optionally neutral and retract can be provided for more efficient processing on streams with changing data. Additionally, serialize and deserialize can be customized. By default they use pickle module, but if the accumulator state is serializable to pathway value type in an easier way, this can be overwritten.

import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
  def __init__(self, sum, cnt):
    self.sum = sum
    self.cnt = cnt

  @classmethod
  def from_row(self, row):
    [val] = row
    return CustomAvgAccumulator(val, 1)

  def update(self, other):
    self.sum += other.sum
    self.cnt += other.cnt

  def compute_result(self) -> float:
    return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10  | Alice | dog | 100
9   | Bob   | cat | 80
8   | Alice | cat | 90
7   | Bob   | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)

abstract compute_result()

sourceMandatory function to finalize computation. Used to extract answer from final state of accumulator.

Narrowing the type of this function helps better type the output of the reducer.

classmethod deserialize(val)

sourceDeserialize state from pathway value type.

abstract classmethod from_row(row)

sourceConstruct the accumulator from a row of data. Row will be passed as a list of values.

This is a mandatory function.

classmethod neutral()

sourceNeutral element of the accumulator (aggregation of an empty list).

This function is optional, and allows for more efficient processing on streams with changing data.

retract(other)

sourceUpdate the accumulator by removing the value of another one.

This function is optional, and allows more efficient reductions on streams with changing data.

serialize()

sourceSerialize state to pathway value type.

abstract update(other)

sourceUpdate the accumulator with another one. Method does not need to return anything, the change should be in-place.

This is a mandatory function.

class pw.ColumnExpression

[source]

as_bool()

sourceConverts value to a bool or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": True}, {"value": False}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_bool())
pw.debug.compute_and_print(result, include_id=False)

as_float()

sourceConverts value to a float or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": 1.5}, {"value": 3.14}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_float())
pw.debug.compute_and_print(result, include_id=False)

as_int()

sourceConverts value to an int or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": 1}, {"value": 2}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_int())
pw.debug.compute_and_print(result, include_id=False)

as_str()

sourceConverts value to a string or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": "dog"}, {"value": "cat"}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_str())
pw.debug.compute_and_print(result, include_id=False)

get(index, default=None)

sourceExtracts element at index from an object. The object has to be a Tuple or Json. If no element is present at index, it returns value specified by a default parameter.

Index can be effectively int for Tuple and int or str for Json. For Tuples, using negative index can be used to access elements at the end, moving backwards.

  • Parameters
    • index (ColumnExpression | int | str) – Position to extract element at.
    • default (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...], Error]) – Value returned when no element is at position index. Defaults to None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | a | b | c
1 | 3 | 2 | 2
2 | 4 | 1 | 0
3 | 7 | 3 | 1
'''
)
t2 = t1.with_columns(tup=pw.make_tuple(pw.this.a, pw.this.b))
t3 = t2.select(
    x=pw.this.tup.get(1),
    y=pw.this.tup.get(3),
    z=pw.this.tup.get(pw.this.c),
    t=pw.this.tup.get(pw.this.c, default=100),
)
pw.debug.compute_and_print(t3, include_id=False)

is_none()

sourceReturns true if the value is None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | owner | pet
1 | Alice | dog
2 | Bob   |
3 | Carol | cat
''')
t2 = t1.with_columns(has_no_pet=pw.this.pet.is_none())
pw.debug.compute_and_print(t2, include_id=False)

is_not_none()

sourceReturns true if the value is not None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | owner | pet
1 | Alice | dog
2 | Bob   |
3 | Carol | cat
''')
t2 = t1.with_columns(has_pet=pw.this.pet.is_not_none())
pw.debug.compute_and_print(t2, include_id=False)

to_string()

sourceChanges the values to strings.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
val
1
2
3
4''')
t1.schema
pw.debug.compute_and_print(t1, include_id=False)
t2 = t1.select(val = pw.this.val.to_string())
t2.schema
pw.debug.compute_and_print(t2.select(val=pw.this.val + "a"), include_id=False)

class pw.ColumnReference()

[source]

Reference to the column.

Inherits from ColumnExpression.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
isinstance(t1.age, pw.ColumnReference)
isinstance(t1["owner"], pw.ColumnReference)

property name

Name of the referred column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.name

property table

Table where the referred column belongs to.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.table is t1

class pw.DateTimeNaive(ts_input=<object object>, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, tzinfo=None, *, nanosecond=None, tz=None, unit=None, fold=None)

[source]

Type for storing datetime without timezone information. Extends pandas.Timestamp type.

class pw.DateTimeUtc(ts_input=<object object>, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, tzinfo=None, *, nanosecond=None, tz=None, unit=None, fold=None)

[source]

Type for storing datetime with default timezone. Extends pandas.Timestamp type.

class pw.Duration(value=<object object>, unit=None, **kwargs)

[source]

Type for storing duration of time. Extends pandas.Timedelta type.

class pw.GroupedJoinResult()

[source]

reduce(*args, **kwargs)

sourceReduces grouped join result to table.

  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
    .reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)

class pw.GroupedTable()

[source]

Result of a groupby operation on a Table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner)
isinstance(t2, pw.GroupedTable)

reduce(*args, **kwargs)

sourceReduces grouped table to a table.

  • Parameters
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)

class pw.JoinMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

[source]

Enum used for controlling type of a join when passed to a generic join function. Consists of values: JoinMode.INNER, JoinMode.LEFT, JoinMode.RIGHT, JoinMode.OUTER

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
inner_join = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(inner_join, include_id = False)
outer_join = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.OUTER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(outer_join, include_id = False)

INNER = 0

Use inner join.

LEFT = 1

Use left join.

OUTER = 3

Use outer join.

RIGHT = 2

Use right join.

class pw.JoinResult()

[source]

Result of a join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice    1
2    9    Bob    1
3    8  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    age  owner  pet size
11   10  Alice    3    M
12    9    Bob    1    L
13    8    Tom    1   XL
''')
joinresult= t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner)   # noqa: E501
isinstance(joinresult, pw.JoinResult)
pw.debug.compute_and_print(joinresult.select(t1.age, t2.size), include_id=False)

filter(filter_expression)

sourceFilters rows, keeping the ones satisfying the predicate.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice    1
2    9    Bob    1
3    8  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    age  owner  pet size
11   10  Alice    3    M
12    9    Bob    1    L
13    8    Tom    1   XL
''')
result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size)   # noqa: E501
pw.debug.compute_and_print(result, include_id=False)

groupby(*args, id=None)

sourceGroups join result by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    GroupedJoinResult – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
    .reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)

reduce(*args, **kwargs)

sourceReduce a join result to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)

select(*args, **kwargs)

sourceComputes result of a join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size)   # noqa: E501
pw.debug.compute_and_print(t3, include_id=False)

class pw.Joinable(context)

[source]

join(other, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourceJoin self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)

join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourceInner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join_inner(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(
    age=t1.age, owner_name=t2.owner, size=t2.size
)
pw.debug.compute_and_print(t3, include_id = False)

join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourceLeft-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)

join_outer(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • instance – optional argument describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)

join_right(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
  • Returns
    OuterJoinResult object

property C: ColumnNamespace

Returns the namespace of all the columns of a joinable. Allows accessing column names that might otherwise be a reserved methods.

import pathway as pw
tab = pw.debug.table_from_markdown('''
age | owner | pet | filter
10  | Alice | dog | True
9   | Bob   | dog | True
8   | Alice | cat | False
7   | Bob   | dog | True
''')
isinstance(tab.C.age, pw.ColumnReference)
pw.debug.compute_and_print(tab.filter(tab.C.filter), include_id=False)

class pw.Json()

[source]

Represents JSON values.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
a    | b | c
True | 2 | manul
''')
@pw.udf
def to_json(val) -> pw.Json:
    return pw.Json(val)
result = t1.select(**{c: to_json(pw.this[c]) for c in t1.column_names()})
pw.debug.compute_and_print(result, include_id=False)

as_bool()

sourceReturns Json value as a float if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> bool:
    return data["value"].as_bool()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": True},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)

as_dict()

sourceReturns Json value as a dict if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> tuple:
    return tuple(data["value"].as_dict().values())

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": {"inner": 42}},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)

as_float()

sourceReturns Json value as a float if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> float:
    return data["value"].as_float()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": 3.14},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)

as_int()

sourceReturns Json value as an int if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> int:
    return data["value"].as_int()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": 42},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)

as_list()

sourceReturns Json value as a list if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> int:
    return data["value"].as_list()[-1]

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": [1,2,3]},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)

as_str()

sourceReturns Json value as a string if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> str:
    return data["value"].as_str()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": "foo"},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)

class pw.LiveTable()

[source]

class pw.Schema

[source]

Base class to inherit from when creating schemas. All schemas should be subclasses of this one.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.schema
issubclass(t1.schema, pw.Schema)
class NewSchema(pw.Schema):
  foo: int
SchemaSum = NewSchema | t1.schema
SchemaSum

class pw.SchemaProperties(append_only=None)

[source]

class pw.Table()

[source]

Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
isinstance(t1, pw.Table)

asof_join(other, self_time, other_time, *on, how, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join(
    t2, t1.event_time, t2.event_time, how=pw.JoinMode.LEFT
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

result_without_cutoff = t1.asof_join(
    t2,
    t1.event_time,
    t2.event_time,
    how=pw.JoinMode.LEFT,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_join_left(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a left ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join_left(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

result_without_cutoff = t1.asof_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_join_outer(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an outer ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

asof_join_right(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a right ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
result_without_cutoff = t1.asof_join_right(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_without_cutoff = t1.asof_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)

The record with value=4 from table t2 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_now_join(other, *on, how=JoinMode.INNER, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT} which correspond to inner and left join respectively.
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join(
    data, pw.left.instance == pw.right.instance, how=pw.JoinMode.LEFT
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

asof_now_join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_inner(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

asof_now_join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. If there are no matching rows in other, missing values on the right side are replaced with None. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_left(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)

cast_to_types(**kwargs)

sourceCasts columns to types.

concat(*others)

sourceConcats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    Table – The concatenated table. Id’s of rows from original tables are preserved.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)

concat_reindex(*tables)

sourceConcatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    Table – The concatenated table. It will have new, synthetic ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)

copy()

sourceReturns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
t1 is t2

deduplicate(*, value, instance=None, acceptor, persistent_id=None)

sourceDeduplicates rows in self on value column using acceptor function.

It keeps rows which where accepted by the acceptor function. Acceptor operates on two arguments - CURRENT value and PREVIOUS value.

  • Parameters
    • value (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...], Error]) – column expression used for deduplication.
    • instance (ColumnExpression | None) – Grouping column. For rows with different values in this column, deduplication will be performed separately. Defaults to None.
    • acceptor (Callable[[TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...], Error]), TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...], Error])], bool]) – callback telling whether two values are different.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – the result of deduplication.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
    val | __time__
     1  |     2
     2  |     4
     3  |     6
     4  |     8
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(value=pw.this.val, acceptor=acceptor)
pw.debug.compute_and_print_update_stream(result, include_id=False)

table = pw.debug.table_from_markdown(
    '''
    val | instance | __time__
     1  |     1    |     2
     2  |     1    |     4
     3  |     2    |     6
     4  |     1    |     8
     4  |     2    |     8
     5  |     1    |    10
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(
    value=pw.this.val, instance=pw.this.instance, acceptor=acceptor
)
pw.debug.compute_and_print_update_stream(result, include_id=False)

diff(timestamp, *values, instance=None)

sourceCompute the difference between the values in the values columns and the previous values according to the order defined by the column timestamp.

  • Parameters
    • timestamp (-) – The column reference to the timestamp column on which the order is computed.
    • *values (-) – Variable-length argument representing the column references to the values columns.
    • instance (-) – Can be used to group the values. The difference is only computed between rows with the same instance value.
  • Returns
    Table – A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.
  • Raises
    ValueError – If the columns are not ColumnReference.

NOTE: * The value of the “first” value (the row with the lowest value in the timestamp column) is None.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1         | 1
2         | 2
3         | 4
4         | 7
5         | 11
6         | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
table = pw.debug.table_from_markdown(
    '''
timestamp | instance | values
1         | 0        | 1
2         | 1        | 2
3         | 1        | 4
3         | 0        | 7
6         | 1        | 11
6         | 0        | 16
'''
)
table += table.diff(pw.this.timestamp, pw.this.values, instance=pw.this.instance)
pw.debug.compute_and_print(table, include_id=False)

difference(other)

sourceRestrict self universe to keys not appearing in the other table.

  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)

empty()

sourceCreates an empty table with a schema specified by kwargs.

  • Parameters
    kwargs (DType) – Dict whose keys are column names and values are column types.
  • Returns
    Table – Created empty table.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)

filter(filter_expression)

sourceFilter a table according to filter_expression condition.

  • Parameters
    filter_expression (ColumnExpression) – ColumnExpression that specifies the filtering condition.
  • Returns
    Table – Result has the same schema as self and its ids are subset of self.id.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)

flatten(to_flatten, *, origin_id=None)

sourcePerforms a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable or Json array. Other columns of the table are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by passing origin_id argument, which is a new name of the column with the source ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet  |  age
1 | Dog  |   2
7 | Cat  |   5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)

from_columns(**kwargs)

sourceBuild a table from columns.

All columns must have the same ids. Columns’ names must be pairwise distinct.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float).with_universe_of(t1)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)

groupby(*args, id=None, sort_by=None, instance=None, )

sourceGroups table by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
    • sort_by (ColumnReference | None) – if provided, column values are used as sorting keys for particular reducers
    • instance (ColumnReference | None) – optional argument describing partitioning of the data into separate instances
  • Returns
    GroupedTable – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)

interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)

sourceInterpolates missing values in a column using the previous and next values based on a timestamps column.

  • Parameters
    • timestamp (ColumnReference) – Reference to the column containing timestamps.
    • *values (ColumnReference) – References to the columns containing values to be interpolated.
    • mode (InterpolateMode, optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
  • Returns
    Table – A new table with the interpolated values.
  • Raises
    ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.

NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.

  • If a value is missing at the beginning or end of the column, no interpolation is performed.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1         | 1        | 10
2         |          |
3         | 3        |
4         |          |
5         |          |
6         | 6        | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)

intersect(*tables)

sourceRestrict self universe to keys appearing in all of the tables.

  • Parameters
    tables (Table) – tables keys of which are used to restrict universe.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)

interval_join(other, self_time, other_time, interval, *on, behavior=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourcePerforms an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (pw.ColumnExpression[int | float | datetime]) – time expression in self.
    • other_time (pw.ColumnExpression[int | float | datetime]) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines a temporal behavior of a join - features like delaying entries or ignoring late entries. You can see examples below or read more in the temporal behavior of interval join tutorial .
    • how (JoinMode) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

interval_join_inner(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

interval_join_left(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
result_with_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

interval_join_outer(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)