Pathway API

Reference for all the Pathway classes and functions.

See Table API for the main Table class.

class pw.AsyncTransformer(input_table, *, instance=<class 'pathway.internals.thisclass.this'>.id)

[source]
Allows to perform async transformations on a table.

invoke() will be called asynchronously for each row of an input_table.

Output table can be acccesed via result.

Example:

import pathway as pw
import asyncio
class OutputSchema(pw.Schema):
   ret: int

class AsyncIncrementTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
    async def invoke(self, value) -> Dict[str, Any]:
        await asyncio.sleep(0.1)
        return {"ret": value + 1 }

input = pw.debug.table_from_markdown('''
  | value
1 | 42
2 | 44
''')
result = AsyncIncrementTransformer(input_table=input).result
pw.debug.compute_and_print(result, include_id=False)
ret
43
45

close()

sourceCalled once at the end. Proper place for cleanup.

abstract async invoke(*args, **kwargs)

sourceCalled for every row of input_table. The arguments will correspond to the columns in the input table.

Should return dict of values matching output_schema.

open()

sourceCalled before actual work. Suitable for one time setup.

with_options(capacity=None, timeout=None, retry_strategy=None, cache_strategy=None)

sourceSets async options.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations. Defaults to None, indicating no specific limit.
    • timeout (float | None) – Maximum time (in seconds) to wait for the function result. Defaults to None, indicating no time limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. If set to None and a persistency is enabled, operations will be cached using the persistence layer. Defaults to None.
  • Returns
    self

property failed: Table

The resulting table containing only rows that failed during execution. If the instance argument is specified, it also contains rows that were executed successfully but at least one element from their instance with less or equal time failed.

property finished: Table

The resulting table containing all rows that finished their execution. The column _async_status contains the state of the row. The rows that finished successfully, have their status set to “-SUCCESS-”. The rows that failed, have their status set to “-FAILURE-“. If the instance argument is specified, rows that were executed successfully but at least one element from their instance with less or equal time failed, have their status set as “-FAILURE-“.

If you want to get only rows that executed successfully, use successful property instead.

property output_table: Table

The resulting table containing all rows that started their execution. The column _async_status contains the state of the row. The rows that finished successfully, have their status set to “-SUCCESS-”. The rows that failed, have their status set to “-FAILURE-”. The rows that are still being executed, have their state set to “-PENDING-“.

It is recommended to use this property for debugging/presentational purposes only. For other purposes, successful property should be preferred. It returns a Table containing only rows that were executed successfully.

property result: Table

The resulting table containing only rows that were executed successfully.

Deprecated. Use successful instead.

property successful: Table

The resulting table containing only rows that were executed successfully.

class pw.BaseCustomAccumulator

[source]
Utility class for defining custom accumulators, used for custom reducers. Custom accumulators should inherit from this class, and should implement `from_row`, `update` and `compute_result`. Optionally `neutral` and `retract` can be provided for more efficient processing on streams with changing data. Additionally, `serialize` and `deserialize` can be customized. By default they use `pickle` module, but if the accumulator state is serializable to pathway value type in an easier way, this can be overwritten.
import pathway as pw
class CustomAvgAccumulator(pw.BaseCustomAccumulator):
  def __init__(self, sum, cnt):
    self.sum = sum
    self.cnt = cnt

  @classmethod
  def from_row(self, row):
    [val] = row
    return CustomAvgAccumulator(val, 1)

  def update(self, other):
    self.sum += other.sum
    self.cnt += other.cnt

  def compute_result(self) -> float:
    return self.sum / self.cnt
custom_avg = pw.reducers.udf_reducer(CustomAvgAccumulator)
t1 = pw.debug.table_from_markdown('''
age | owner | pet | price
10  | Alice | dog | 100
9   | Bob   | cat | 80
8   | Alice | cat | 90
7   | Bob   | dog | 70
''')
t2 = t1.groupby(t1.owner).reduce(t1.owner, avg_price=custom_avg(t1.price))
pw.debug.compute_and_print(t2, include_id=False)
owner | avg_price
Alice | 95.0
Bob   | 75.0

abstract compute_result()

sourceMandatory function to finalize computation. Used to extract answer from final state of accumulator.

Narrowing the type of this function helps better type the output of the reducer.

classmethod deserialize(val)

sourceDeserialize state from pathway value type.

abstract classmethod from_row(row)

sourceConstruct the accumulator from a row of data. Row will be passed as a list of values.

This is a mandatory function.

classmethod neutral()

sourceNeutral element of the accumulator (aggregation of an empty list).

This function is optional, and allows for more efficient processing on streams with changing data.

retract(other)

sourceUpdate the accumulator by removing the value of another one.

This function is optional, and allows more efficient reductions on streams with changing data.

serialize()

sourceSerialize state to pathway value type.

abstract update(other)

sourceUpdate the accumulator with another one. Method does not need to return anything, the change should be in-place.

This is a mandatory function.

class pw.ClassArg(ref: RowReference, ptr: Pointer)

[source]
Base class to inherit from when writing inner classes for class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()
        @pw.output_attribute
        def ret(self) -> int:
            return self.arg + 1
t1 = pw.debug.table_from_markdown('''
age
10
9
8
7
''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pointer_from(*args, optional=False)

sourcePseudo-random hash of its argument. Produces pointer types. Applied value-wise.

class pw.ColumnExpression

[source]

as_bool()

sourceConverts value to a bool or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": True}, {"value": False}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_bool())
pw.debug.compute_and_print(result, include_id=False)
result
False
True

as_float()

sourceConverts value to a float or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": 1.5}, {"value": 3.14}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_float())
pw.debug.compute_and_print(result, include_id=False)
result
1.5
3.14

as_int()

sourceConverts value to an int or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": 1}, {"value": 2}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_int())
pw.debug.compute_and_print(result, include_id=False)
result
1
2

as_str()

sourceConverts value to a string or None if not possible. Currently works for Json columns only.

Example:

import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
    data: dict
dt = pd.DataFrame(data={"data": [{"value": "dog"}, {"value": "cat"}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)
result = table.select(result=pw.this.data.get("value").as_str())
pw.debug.compute_and_print(result, include_id=False)
result
cat
dog

get(index, default=None)

sourceExtracts element at index from an object. The object has to be a Tuple or Json. If no element is present at index, it returns value specified by a default parameter.

Index can be effectively int for Tuple and int or str for Json. For Tuples, using negative index can be used to access elements at the end, moving backwards.

  • Parameters
    • index (ColumnExpression | int | str) – Position to extract element at.
    • default (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – Value returned when no element is at position index. Defaults to None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | a | b | c
1 | 3 | 2 | 2
2 | 4 | 1 | 0
3 | 7 | 3 | 1
'''
)
t2 = t1.with_columns(tup=pw.make_tuple(pw.this.a, pw.this.b))
t3 = t2.select(
    x=pw.this.tup.get(1),
    y=pw.this.tup.get(3),
    z=pw.this.tup.get(pw.this.c),
    t=pw.this.tup.get(pw.this.c, default=100),
)
pw.debug.compute_and_print(t3, include_id=False)
x | y | z | t
1 |   | 4 | 4
2 |   |   | 100
3 |   | 3 | 3

is_none()

sourceReturns true if the value is None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | owner | pet
1 | Alice | dog
2 | Bob   |
3 | Carol | cat
''')
t2 = t1.with_columns(has_no_pet=pw.this.pet.is_none())
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | has_no_pet
Alice | dog | False
Bob   |     | True
Carol | cat | False

is_not_none()

sourceReturns true if the value is not None.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | owner | pet
1 | Alice | dog
2 | Bob   |
3 | Carol | cat
''')
t2 = t1.with_columns(has_pet=pw.this.pet.is_not_none())
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | has_pet
Alice | dog | True
Bob   |     | False
Carol | cat | True

to_string()

sourceChanges the values to strings.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
val
1
2
3
4''')
t1.schema
<pathway.Schema types={'val': <class 'int'>}>
pw.debug.compute_and_print(t1, include_id=False)
val
1
2
3
4
t2 = t1.select(val = pw.this.val.to_string())
t2.schema
<pathway.Schema types={'val': <class 'str'>}>
pw.debug.compute_and_print(t2.select(val=pw.this.val + "a"), include_id=False)
val
1a
2a
3a
4a

class pw.ColumnReference()

[source]
Reference to the column.

Inherits from ColumnExpression.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
isinstance(t1.age, pw.ColumnReference)
True
isinstance(t1["owner"], pw.ColumnReference)
True

property name

Name of the referred column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.name
'age'

property table

Table where the referred column belongs to.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.table is t1
True

class pw.DateTimeNaive(ts_input=<object object>, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, tzinfo=None, *, nanosecond=None, tz=None, unit=None, fold=None)

[source]
Type for storing datetime without timezone information. Extends pandas.Timestamp type.

class pw.DateTimeUtc(ts_input=<object object>, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, tzinfo=None, *, nanosecond=None, tz=None, unit=None, fold=None)

[source]
Type for storing datetime with default timezone. Extends pandas.Timestamp type.

class pw.Duration(value=<object object>, unit=None, **kwargs)

[source]
Type for storing duration of time. Extends pandas.Timedelta type.

class pw.GroupedJoinResult()

[source]

reduce(*args, **kwargs)

sourceReduces grouped join result to table.

  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
    .reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob   | 1

class pw.GroupedTable()

[source]
Result of a groupby operation on a Table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner)
isinstance(t2, pw.GroupedTable)
True

reduce(*args, **kwargs)

sourceReduces grouped table to a table.

  • Parameters
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob   | dog | 16

class pw.JoinMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

[source]
Enum used for controlling type of a join when passed to a generic join function. Consists of values: JoinMode.INNER, JoinMode.LEFT, JoinMode.RIGHT, JoinMode.OUTER
import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
inner_join = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(inner_join, include_id = False)
age | owner_name | size
9   | Bob        | L
outer_join = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.OUTER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(outer_join, include_id = False)
age | owner_name | size
    | Alice      | M
    | Tom        | XL
8   |            |
9   | Bob        | L
10  |            |

INNER = 0

Use inner join.

LEFT = 1

Use left join.

OUTER = 3

Use outer join.

RIGHT = 2

Use right join.

class pw.JoinResult()

[source]
Result of a join between tables.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice    1
2    9    Bob    1
3    8  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    age  owner  pet size
11   10  Alice    3    M
12    9    Bob    1    L
13    8    Tom    1   XL
''')
joinresult= t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner)   # noqa: E501
isinstance(joinresult, pw.JoinResult)
True
pw.debug.compute_and_print(joinresult.select(t1.age, t2.size), include_id=False)
age | size
9   | L

filter(filter_expression)

sourceFilters rows, keeping the ones satisfying the predicate.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice    1
2    9    Bob    1
3    8  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    age  owner  pet size
11   10  Alice    3    M
12    9    Bob    1    L
13    8    Tom    1   XL
''')
result = t1.join(t2).filter(t1.owner == t2.owner).select(t1.age, t2.size)   # noqa: E501
pw.debug.compute_and_print(result, include_id=False)
age | size
8   | M
9   | L
10  | M

groupby(*args, id=None)

sourceGroups join result by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
  • Returns
    GroupedJoinResult – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = (t1.join(t2, t1.owner==t2.owner).groupby(pw.this.owner)
    .reduce(pw.this.owner, pairs = pw.reducers.count()))
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob   | 1

reduce(*args, **kwargs)

sourceReduce a join result to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t2 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
result = t1.join(t2, t1.owner==t2.owner).reduce(total_pairs = pw.reducers.count())
pw.debug.compute_and_print(result, include_id=False)
total_pairs
3

select(*args, **kwargs)

sourceComputes result of a join.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(age=t1.age, owner_name=t2.owner, size=t2.size)   # noqa: E501
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9   | Bob        | L

class pw.Joinable(context)

[source]

join(other, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourceJoin self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourceInner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join_inner(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(
    age=t1.age, owner_name=t2.owner, size=t2.size
)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourceLeft-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)
a  | t2_c | s
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_outer(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • instance – optional argument describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

join_right(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
  • Returns
    OuterJoinResult object

property C: ColumnNamespace

Returns the namespace of all the columns of a joinable. Allows accessing column names that might otherwise be a reserved methods.

import pathway as pw
tab = pw.debug.table_from_markdown('''
age | owner | pet | filter
10  | Alice | dog | True
9   | Bob   | dog | True
8   | Alice | cat | False
7   | Bob   | dog | True
''')
isinstance(tab.C.age, pw.ColumnReference)
True
pw.debug.compute_and_print(tab.filter(tab.C.filter), include_id=False)
age | owner | pet | filter
7   | Bob   | dog | True
9   | Bob   | dog | True
10  | Alice | dog | True

class pw.Json()

[source]
Represents JSON values.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
a    | b | c
True | 2 | manul
''')
@pw.udf
def to_json(val) -> pw.Json:
    return pw.Json(val)
result = t1.select(**{c: to_json(pw.this[c]) for c in t1.column_names()})
pw.debug.compute_and_print(result, include_id=False)
a    | b | c
true | 2 | "manul"

as_bool()

sourceReturns Json value as a float if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> bool:
    return data["value"].as_bool()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": True},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
True

as_dict()

sourceReturns Json value as a dict if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> tuple:
    return tuple(data["value"].as_dict().values())

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": {"inner": 42}},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
(42,)

as_float()

sourceReturns Json value as a float if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> float:
    return data["value"].as_float()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": 3.14},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
3.14

as_int()

sourceReturns Json value as an int if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> int:
    return data["value"].as_int()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": 42},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
42

as_list()

sourceReturns Json value as a list if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> int:
    return data["value"].as_list()[-1]

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": [1,2,3]},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
3

as_str()

sourceReturns Json value as a string if possible.

Example:

import pathway as pw
class InputSchema(pw.Schema):
    data: pw.Json

@pw.udf
def extract(data: pw.Json) -> str:
    return data["value"].as_str()

table = pw.debug.table_from_rows(schema=InputSchema, rows=[({"value": "foo"},)])
result = table.select(result=extract(pw.this.data))
pw.debug.compute_and_print(result, include_id=False)
result
foo

class pw.LiveTable()

[source]

class pw.Schema

[source]
Base class to inherit from when creating schemas. All schemas should be subclasses of this one.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.schema
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}>
issubclass(t1.schema, pw.Schema)
True
class NewSchema(pw.Schema):
  foo: int
SchemaSum = NewSchema | t1.schema
SchemaSum
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>, 'foo': <class 'int'>}>

class pw.SchemaProperties(append_only=None)

[source]

class pw.Table()

[source]
Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
isinstance(t1, pw.Table)
True

asof_join(other, self_time, other_time, *on, how, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • how (JoinMode) – mode of the join (LEFT, RIGHT, FULL)
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    how=pw.JoinMode.LEFT,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 1  | 1        | -1        | 0
0        | 4  | 2        | 6         | 8
0        | 5  | 3        | 6         | 9
0        | 6  | 4        | 6         | 10
0        | 7  | 5        | 2         | 7
0        | 11 | 6        | 9         | 15
0        | 12 | 7        | 9         | 16
1        | 5  | 8        | 7         | 15
1        | 7  | 9        | 7         | 16

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join(
    t2, t1.event_time, t2.event_time, how=pw.JoinMode.LEFT
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
4          | 42          | 1         | 1          | 8        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

result_without_cutoff = t1.asof_join(
    t2,
    t1.event_time,
    t2.event_time,
    how=pw.JoinMode.LEFT,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_join_left(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a left ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_left(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 1  | 1        | -1        | 0
0        | 4  | 2        | 6         | 8
0        | 5  | 3        | 6         | 9
0        | 6  | 4        | 6         | 10
0        | 7  | 5        | 2         | 7
0        | 11 | 6        | 9         | 15
0        | 12 | 7        | 9         | 16
1        | 5  | 8        | 7         | 15
1        | 7  | 9        | 7         | 16

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
result_without_cutoff = t1.asof_join_left(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
4          | 42          | 1         | 1          | 8        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

result_without_cutoff = t1.asof_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
2          | 42          | 2         | 1          | 4        | 1
3          | 42          | 5         | 1          | 6        | 1
3          | 42          | 5         | 1          | 10       | -1
3          | 8           | 5         | 4          | 10       | 1
5          | 8           | 7         | 4          | 14       | 1

The record with value=4 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_join_outer(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform an outer ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_outer(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1, t2.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 1  | 1        | -1        | 0
0        | 2  | 1        | 0         | 1
0        | 3  | 1        | 6         | 7
0        | 4  | 2        | 6         | 8
0        | 5  | 3        | 6         | 9
0        | 6  | 4        | 6         | 10
0        | 7  | 5        | 2         | 7
0        | 7  | 5        | 6         | 11
0        | 8  | 5        | 3         | 8
0        | 9  | 5        | 9         | 14
0        | 11 | 6        | 9         | 15
0        | 12 | 7        | 9         | 16
0        | 13 | 7        | 7         | 14
0        | 14 | 7        | 4         | 11
1        | 2  | -1       | 7         | 6
1        | 5  | 8        | 7         | 15
1        | 7  | 9        | 7         | 16
1        | 8  | 9        | 3         | 12

asof_join_right(other, self_time, other_time, *on, behavior=None, defaults={}, direction=Direction.BACKWARD, left_instance=None, right_instance=None)

sourcePerform a right ASOF join of two tables.

  • Parameters
    • other (Table) – Table to join with self, both must contain a column val
    • self_time (ColumnExpression) – time-like column expression to do the join against
    • other_time (ColumnExpression) – time-like column expression to do the join against
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines the temporal behavior of a join - features like delaying entries or ignoring late entries.
    • defaults (dict[ColumnReference, Any]) – dictionary column-> default value. Entries in the resulting table that not have a predecessor in the join will be set to this default value. If no default is provided, None will be used.
    • direction (Direction) – direction of the join, accepted values: Direction.BACKWARD, Direction.FORWARD, Direction.NEAREST
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | K | val |  t
    1   | 0 | 1   |  1
    2   | 0 | 2   |  4
    3   | 0 | 3   |  5
    4   | 0 | 4   |  6
    5   | 0 | 5   |  7
    6   | 0 | 6   |  11
    7   | 0 | 7   |  12
    8   | 1 | 8   |  5
    9   | 1 | 9   |  7
'''
)
t2 = pw.debug.table_from_markdown(
    '''
         | K | val | t
    21   | 1 | 7  | 2
    22   | 1 | 3  | 8
    23   | 0 | 0  | 2
    24   | 0 | 6  | 3
    25   | 0 | 2  | 7
    26   | 0 | 3  | 8
    27   | 0 | 9  | 9
    28   | 0 | 7  | 13
    29   | 0 | 4  | 14
    '''
)
res = t1.asof_join_right(
    t2,
    t1.t,
    t2.t,
    t1.K == t2.K,
    defaults={t1.val: -1},
).select(
    pw.this.instance,
    pw.this.t,
    val_left=t1.val,
    val_right=t2.val,
    sum=t1.val + t2.val,
)
pw.debug.compute_and_print(res, include_id=False)
instance | t  | val_left | val_right | sum
0        | 2  | 1        | 0         | 1
0        | 3  | 1        | 6         | 7
0        | 7  | 5        | 2         | 7
0        | 8  | 5        | 3         | 8
0        | 9  | 5        | 9         | 14
0        | 13 | 7        | 7         | 14
0        | 14 | 7        | 4         | 11
1        | 2  | -1       | 7         | 6
1        | 8  | 9        | 3         | 12

Setting behavior allows to control temporal behavior of an asof join. Then, each side of the asof join keeps track of the maximal already seen time (self_time and other_time). In the context of asof_join the arguments of behavior are defined as follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, the asof join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Examples without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      42  |      1     |     2
       8  |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | event_time | __time__
      2   |      2     |     4
      3   |      5     |     6
      4   |      1     |     8
      5   |      7     |    14
'''
)
result_without_cutoff = t1.asof_join_right(t2, t1.event_time, t2.event_time).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
42         | 2           | 1         | 2          | 4        | 1
42         | 3           | 1         | 5          | 6        | 1
42         | 4           | 1         | 1          | 8        | 1
42         | 3           | 1         | 5          | 10       | -1
8          | 3           | 4         | 5          | 10       | 1
8          | 5           | 4         | 7          | 14       | 1
result_without_cutoff = t1.asof_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    behavior=pw.temporal.common_behavior(cutoff=2),
).select(
    left_value=t1.value,
    right_value=t2.value,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | left_time | right_time | __time__ | __diff__
42         | 2           | 1         | 2          | 4        | 1
42         | 3           | 1         | 5          | 6        | 1
42         | 3           | 1         | 5          | 10       | -1
8          | 3           | 4         | 5          | 10       | 1
8          | 5           | 4         | 7          | 14       | 1

The record with value=4 from table t2 was not joined because its event_time was less than the maximal already seen time minus cutoff (1 <= 5-2).

asof_now_join(other, *on, how=JoinMode.INNER, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT} which correspond to inner and left join respectively.
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join(
    data, pw.left.instance == pw.right.instance, how=pw.JoinMode.LEFT
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
1     |     | 2        | 1
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

asof_now_join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_inner(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

asof_now_join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourcePerforms asof now join of self with other using join expressions. Each row of self is joined with rows from other at a given processing time. If there are no matching rows in other, missing values on the right side are replaced with None. Rows from self are not stored. They are joined with rows of other at their processing time. If other is updated in the future, rows from self from the past won’t be updated. Rows from other are stored. They can be joined with future rows of self.

  • Parameters
    • other (Table) – the right side of a join.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
  • Returns
    AsofNowJoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
data = pw.debug.table_from_markdown(
    '''
    id | value | instance | __time__ | __diff__
     2 |   4   |    1     |     4    |     1
     2 |   4   |    1     |    10    |    -1
     5 |   5   |    1     |    10    |     1
     7 |   2   |    2     |    14    |     1
     7 |   2   |    2     |    22    |    -1
    11 |   3   |    2     |    26    |     1
     5 |   5   |    1     |    30    |    -1
    14 |   9   |    1     |    32    |     1
    '''
)
queries = pw.debug.table_from_markdown(
    '''
    value | instance | __time__
      1   |    1     |     2
      2   |    1     |     6
      4   |    1     |    12
      5   |    2     |    16
     10   |    1     |    26
    '''
)
result = queries.asof_now_join_left(
    data, pw.left.instance == pw.right.instance
).select(query=pw.left.value, ans=pw.right.value)
pw.debug.compute_and_print_update_stream(result, include_id=False)
query | ans | __time__ | __diff__
1     |     | 2        | 1
2     | 4   | 6        | 1
4     | 5   | 12       | 1
5     | 2   | 16       | 1
10    | 5   | 26       | 1

cast_to_types(**kwargs)

sourceCasts columns to types.

concat(*others)

sourceConcats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    Table – The concatenated table. Id’s of rows from original tables are preserved.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

concat_reindex(*tables)

sourceConcatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    Table – The concatenated table. It will have new, synthetic ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)
pet
Cat
Dog
Manul
Octopus

copy()

sourceReturns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
7   | Bob   | dog
8   | Alice | cat
9   | Bob   | dog
10  | Alice | dog
t1 is t2
False

deduplicate(*, value, instance=None, acceptor, persistent_id=None)

sourceDeduplicates rows in self on value column using acceptor function.

It keeps rows which where accepted by the acceptor function. Acceptor operates on two arguments - current value and the previously accepted value.

  • Parameters
    • value (ColumnExpression) – column expression used for deduplication.
    • instance (ColumnExpression | None) – Grouping column. For rows with different values in this column, deduplication will be performed separately. Defaults to None.
    • acceptor (Callable[[TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]), TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]])], bool]) – callback telling whether two values are different.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – the result of deduplication.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
    val | __time__
     1  |     2
     2  |     4
     3  |     6
     4  |     8
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(value=pw.this.val, acceptor=acceptor)
pw.debug.compute_and_print_update_stream(result, include_id=False)
val | __time__ | __diff__
1   | 2        | 1
1   | 6        | -1
3   | 6        | 1

table = pw.debug.table_from_markdown(
    '''
    val | instance | __time__
     1  |     1    |     2
     2  |     1    |     4
     3  |     2    |     6
     4  |     1    |     8
     4  |     2    |     8
     5  |     1    |    10
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(
    value=pw.this.val, instance=pw.this.instance, acceptor=acceptor
)
pw.debug.compute_and_print_update_stream(result, include_id=False)
val | instance | __time__ | __diff__
1   | 1        | 2        | 1
3   | 2        | 6        | 1
1   | 1        | 8        | -1
4   | 1        | 8        | 1

diff(timestamp, *values)

sourceCompute the difference between the values in the values columns and the previous values according to the order defined by the column timestamp.

  • Parameters
    • timestamp (-) – The column reference to the timestamp column on which the order is computed.
    • *values (-) – Variable-length argument representing the column references to the values columns.
  • Returns
    Table – A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.
  • Raises
    ValueError – If the columns are not ColumnReference.

NOTE: * The value of the “first” value (the row with the lower value in the timestamp column) is None.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1         | 1
2         | 2
3         | 4
4         | 7
5         | 11
6         | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values | diff_values
1         | 1      |
2         | 2      | 1
3         | 4      | 2
4         | 7      | 3
5         | 11     | 4
6         | 16     | 5

difference(other)

sourceRestrict self universe to keys not appearing in the other table.

  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
10  | Alice | 1

empty()

sourceCreates an empty table with a schema specified by kwargs.

  • Parameters
    kwargs (DType) – Dict whose keys are column names and values are column types.
  • Returns
    Table – Created empty table.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)
age | pet

filter(filter_expression)

sourceFilter a table according to filter_expression condition.

  • Parameters
    filter_expression (ColumnExpression) – ColumnExpression that specifies the filtering condition.
  • Returns
    Table – Result has the same schema as self and its ids are subset of self.id.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)
label | outdegree
7     | 0

flatten(*args, **kwargs)

sourcePerforms a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable or Json array. Other columns specified in the method arguments are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by using table.id column, e.g. table.flatten(table.column_to_be_flattened, original_id = table.id).

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet  |  age
1 | Dog  |   2
7 | Cat  |   5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
pet
C
D
a
g
o
t
t3 = t1.flatten(t1.pet, t1.age)
pw.debug.compute_and_print(t3, include_id=False)
pet | age
C   | 5
D   | 2
a   | 5
g   | 2
o   | 2
t   | 5

from_columns(**kwargs)

sourceBuild a table from columns.

All columns must have the same ids. Columns’ names must be pairwise distinct.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float).with_universe_of(t1)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)
pet | qux

groupby(*args, id=None, sort_by=None, instance=None)

sourceGroups table by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
    • sort_by (ColumnReference | None) – if provided, column values are used as sorting keys for particular reducers
    • instance (ColumnReference | None) – optional argument describing partitioning of the data into separate instances
  • Returns
    GroupedTable – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob   | dog | 16

having(*indexers)

sourceRemoves rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexers

interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)

sourceInterpolates missing values in a column using the previous and next values based on a timestamps column.

  • Parameters
    • timestamp (ColumnReference) – Reference to the column containing timestamps.
    • *values (ColumnReference) – References to the columns containing values to be interpolated.
    • mode (InterpolateMode, optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
  • Returns
    Table – A new table with the interpolated values.
  • Raises
    ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.

NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.

  • If a value is missing at the beginning or end of the column, no interpolation is performed.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1         | 1        | 10
2         |          |
3         | 3        |
4         |          |
5         |          |
6         | 6        | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)
timestamp | values_a | values_b
1         | 1        | 10
2         | 2.0      | 20.0
3         | 3        | 30.0
4         | 4.0      | 40.0
5         | 5.0      | 50.0
6         | 6        | 60

intersect(*tables)

sourceRestrict self universe to keys appearing in all of the tables.

  • Parameters
    tables (Table) – tables keys of which are used to restrict universe.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1

interval_join(other, self_time, other_time, interval, *on, behavior=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourcePerforms an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (pw.ColumnExpression[int | float | datetime]) – time expression in self.
    • other_time (pw.ColumnExpression[int | float | datetime]) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines a temporal behavior of a join - features like delaying entries or ignoring late entries. You can see examples below or read more in the temporal behavior of interval join tutorial .
    • how (JoinMode) – decides whether to run interval_join_inner, interval_join_left, interval_join_right or interval_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b, how=pw.JoinMode.INNER
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

interval_join_inner(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_inner(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_inner(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_inner(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

interval_join_left(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval left join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the left side that haven’t been matched with the right side are returned with missing values on the right side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_left(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_left(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          |             | 2        | 2         |            | 30       | -1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_left(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

interval_join_outer(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval outer join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows that haven’t been matched with the other side are returned with missing values on the other side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_outer(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
11     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_outer(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
1 | 11     |
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2
3 | 4      |

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_outer(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          |             | 2        | 2         |            | 30       | -1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_outer(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
2          |             | 2        | 2         |            | 4        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
4          |             | 2        | 8         |            | 8        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          |             | 2        | 8         |            | 14       | -1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

Notice also the entries with __diff__=-1. They’re deletion entries caused by the arrival of matching entries on the right side of the join. The matches caused the removal of entries without values in the fields from the right side and insertion of entries with values in these fields.

interval_join_right(other, self_time, other_time, interval, *on, behavior=None, left_instance=None, right_instance=None)

sourcePerforms an interval right join of self with other using a time difference and join expressions. If self_time + lower_bound <= other_time <= self_time + upper_bound and conditions in on are satisfied, the rows are joined. Rows from the right side that haven’t been matched with the left side are returned with missing values on the left side replaced with None.

  • Parameters
    • other (Table) – the right side of the join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • lower_bound – a lower bound on time difference between other_time and self_time.
    • upper_bound – an upper bound on time difference between other_time and self_time.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • behavior (CommonBehavior | None) – defines temporal behavior of a join - features like delaying entries or ignoring late entries.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    IntervalJoinResult – a result of the interval join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 3
  2 | 4
  3 | 5
  4 | 11
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 0
  2 | 1
  3 | 4
  4 | 7
'''
)
t3 = t1.interval_join_right(t2, t1.t, t2.t, pw.temporal.interval(-2, 1)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 0
       | 7
3      | 1
3      | 4
4      | 4
5      | 4
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 3
  2 | 1 | 4
  3 | 1 | 5
  4 | 1 | 11
  5 | 2 | 2
  6 | 2 | 3
  7 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 0
  2 | 1 | 1
  3 | 1 | 4
  4 | 1 | 7
  5 | 2 | 0
  6 | 2 | 2
  7 | 4 | 2
'''
)
t3 = t1.interval_join_right(
    t2, t1.t, t2.t, pw.temporal.interval(-2, 1), t1.a == t2.b
).select(t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
a | left_t | right_t
  |        | 0
  |        | 2
  |        | 7
1 | 3      | 1
1 | 3      | 4
1 | 4      | 4
1 | 5      | 4
2 | 2      | 0
2 | 2      | 2
2 | 3      | 2

Setting behavior allows to control temporal behavior of an interval join. Then, each side of the interval join keeps track of the maximal already seen time (self_time and other_time). The arguments of behavior mean in the context of an interval join what follows:

  • delay - buffers results until the maximal already seen time is greater than or equal to their time plus delay.
  • cutoff - ignores records with times less or equal to the maximal already seen time minus cutoff; it is also used to garbage collect records that have times lower or equal to the above threshold. When cutoff is not set, interval join will remember all records from both sides.
  • keep_results - if set to True, keeps all results of the operator. If set to False, keeps only results that are newer than the maximal seen time minus cutoff.

Example without and with forgetting:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      1   |     1    |      0     |     2
      2   |     2    |      2     |     4
      3   |     1    |      4     |     4
      4   |     2    |      8     |     8
      5   |     1    |      0     |    10
      6   |     1    |      4     |    10
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    value | instance | event_time | __time__
      42  |     1    |      2     |     2
       8  |     2    |     10     |    14
      10  |     2    |      4     |    30
'''
)
result_without_cutoff = t1.interval_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_without_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
5          | 42          | 1        | 0         | 2          | 10       | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1
2          | 10          | 2        | 2         | 4          | 30       | 1
result_with_cutoff = t1.interval_join_right(
    t2,
    t1.event_time,
    t2.event_time,
    pw.temporal.interval(-2, 2),
    t1.instance == t2.instance,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    left_value=t1.value,
    right_value=t2.value,
    instance=t1.instance,
    left_time=t1.event_time,
    right_time=t2.event_time,
)
pw.debug.compute_and_print_update_stream(result_with_cutoff, include_id=False)
left_value | right_value | instance | left_time | right_time | __time__ | __diff__
1          | 42          | 1        | 0         | 2          | 2        | 1
3          | 42          | 1        | 4         | 2          | 4        | 1
6          | 42          | 1        | 4         | 2          | 10       | 1
4          | 8           | 2        | 8         | 10         | 14       | 1

The record with value=5 from table t1 was not joined because its event_time was less than the maximal already seen time minus cutoff (0 <= 8-6). The record with value=10 from table t2 was not joined because its event_time was equal to the maximal already seen time minus cutoff (4 <= 10-6).

ix(expression, *, optional=False, context=None)

sourceReindexes the table using expression values as keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError.

Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).

  • Returns
    Reindexed table with the same set of columns.

Example:

import pathway as pw
t_animals = pw.debug.table_from_markdown('''
  | epithet    | genus
1 | upupa      | epops
2 | acherontia | atropos
3 | bubo       | scandiacus
4 | dynastes   | hercules
''')
t_birds = pw.debug.table_from_markdown('''
  | desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)
desc   | latin
hoopoe | atropos
owl    | hercules

ix_ref(*args, optional=False, context=None, instance=None)

sourceReindexes the table using expressions as primary keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError.

Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).

  • Parameters
    args (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – Column references.
  • Returns
    Row – indexed row.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)
name   | pet | new_value
Alice  | dog | dog
Bob    | cat | dog
Carole | cat | dog
David  | dog | dog

Tables obtained by a groupby/reduce scheme always have primary keys:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(t1.pet).count)
pw.debug.compute_and_print(t3, include_id=False)
name   | pet | new_value
Alice  | dog | 1
Bob    | cat | 3
Carole | cat | 3
David  | cat | 3

Single-row tables can be accessed via ix_ref():

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.reduce(count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(context=t1).count)
pw.debug.compute_and_print(t3, include_id=False)
name   | pet | new_value
Alice  | dog | 4
Bob    | cat | 4
Carole | cat | 4
David  | cat | 4

plot(plotting_function, sorting_col=None)

sourceAllows for plotting contents of the table visually in e.g. jupyter. If the table depends only on the bounded data sources, the plot will be generated right away. Otherwise (in streaming scenario), the plot will be auto-updating after running pw.run()

  • Parameters
    • self (pw.Table) – a table serving as a source of data
    • plotting_function (Callable[[ColumnDataSource], Plot]) – function for creating plot from ColumnDataSource
  • Returns
    pn.Column – visualization which can be displayed immediately or passed as a dashboard widget

Example:

import pathway as pw
from bokeh.plotting import figure
def func(source):
    plot = figure(height=400, width=400, title="CPU usage over time")
    plot.scatter('a', 'b', source=source, line_width=3, line_alpha=0.6)
    return plot
viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).plot(func)
type(viz)
<class 'panel.layout.base.Column'>

pointer_from(*args, optional=False, instance=None)

sourcePseudo-random hash of its argument. Produces pointer types. Applied column-wise.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)
test
True
True

reduce(*args, **kwargs)

sourceReduce a table to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
ageagg
^...
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)
age | pet
7   | dog

rename(names_mapping=None, **kwargs)

sourceRename columns according either a dictionary or kwargs.

If a mapping is provided using a dictionary, rename_by_dict will be used. Otherwise, rename_columns will be used with kwargs. Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    • names_mapping (dict[str | ColumnReference, str] | None) – mapping from old column names to new names.
    • kwargs (ColumnExpression) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

rename_by_dict(names_mapping)

sourceRename columns according to a dictionary.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    names_mapping (dict[str | ColumnReference, str]) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_by_dict({"age": "years_old", t1.pet: "animal"})
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8         | 2
Alice | 10        | 1
Bob   | 9         | 1

rename_columns(**kwargs)

sourceRename columns according to kwargs.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    kwargs (str | ColumnReference) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner | years_old | animal
Alice | 8         | 2
Alice | 10        | 1
Bob   | 9         | 1

restrict(other)

sourceRestrict self universe to keys appearing in other.

  • Parameters
    other (TableLike) – table which universe is used to restrict universe of self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
'''
)
t2 = pw.debug.table_from_markdown(
    '''
  | cost
2 | 100
3 | 200
'''
)
t2.promise_universe_is_subset_of(t1)
<pathway.Table schema={'cost': <class 'int'>}>
t3 = t1.restrict(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1

select(*args, **kwargs)

sourceBuild a new table with columns specified by kwargs.

Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)
animal | desc
Cat    | fluffy
Dog    | fluffy

show(*, snapshot=True, include_id=True, short_pointers=True, sorters=None)

sourceAllows for displaying table visually in e.g. jupyter. If the table depends only on the bounded data sources, the table preview will be generated right away. Otherwise (in streaming scenario), the table will be auto-updating after running pw.run()

  • Parameters
    • self (pw.Table) – a table to be displayed
    • snapshot (bool, optional) – whether only current snapshot or all changes to the table should be displayed. Defaults to True.
    • include_id (bool, optional) – whether to show ids of rows. Defaults to True.
    • short_pointers (bool, optional) – whether to shorten printed ids. Defaults to True.
  • Returns
    pn.Column – visualization which can be displayed immediately or passed as a dashboard widget

Example:

import pathway as pw
table_viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).show()
type(table_viz)
<class 'panel.layout.base.Column'>

sort(key, instance=None)

sourceSorts a table by the specified keys.

  • Parameters
    • table – pw.Table The table to be sorted.
    • key (ColumnExpression[int | float | datetime | str | bytes]) – An expression to sort by.
    • instance (ColumnExpression | None) – ColumnReference or None An expression with instance. Rows are sorted within an instance. prev and next columns will only point to rows that have the same instance.
  • Returns
    pw.Table – The sorted table. Contains two columns: prev and next, containing the pointers to the previous and next rows.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age)
pw.debug.compute_and_print(table, include_id=True)
            | name    | age | score | prev        | next
^GBSDEEW... | Alice   | 25  | 80    | ^EDPSSB1... | ^DS9AT95...
^EDPSSB1... | Bob     | 20  | 90    |             | ^GBSDEEW...
^DS9AT95... | Charlie | 30  | 80    | ^GBSDEEW... |
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
David    | 35  | 90
Eve      | 15  | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age, instance=pw.this.score)
pw.debug.compute_and_print(table, include_id=True)
            | name    | age | score | prev        | next
^GBSDEEW... | Alice   | 25  | 80    | ^T0B95XH... | ^DS9AT95...
^EDPSSB1... | Bob     | 20  | 90    |             | ^RT0AZWX...
^DS9AT95... | Charlie | 30  | 80    | ^GBSDEEW... |
^RT0AZWX... | David   | 35  | 90    | ^EDPSSB1... |
^T0B95XH... | Eve     | 15  | 80    |             | ^GBSDEEW...

split(split_expression)

sourceSplit a table according to split_expression condition.

  • Parameters
    split_expression (ColumnExpression) – ColumnExpression that specifies the split condition.
  • Returns
    positive_table, negative_table – tuple of tables, with the same schemas as self and with ids that are subsets of self.id, and provably disjoint.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
positive, negative = vertices.split(vertices.outdegree == 0)
pw.debug.compute_and_print(positive, include_id=False)
label | outdegree
7     | 0
pw.debug.compute_and_print(negative, include_id=False)
label | outdegree
1     | 3

typehints()

sourceReturn the types of the columns as a dictionary.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.typehints()
mappingproxy({'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>})

update_cells(other, )

sourceUpdates cells of self, breaking ties in favor of the values in other.

Semantics:

* result.columns == self.columns

* result.id == self.id

* conflicts are resolved preferring other’s values

Requires:

* other.columns ⊆ self.columns

* other.id ⊆ self.id
  • Parameters
    other (Table) – the other table.
  • Returns
    Table – self updated with cells form other.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
    age | owner | pet
1 | 10  | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

update_rows(other)

sourceUpdates rows of self, breaking ties in favor for the rows in other.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

Requires:

  • other.columns == self.columns
  • Parameters
    other (Table[TypeVar(TSchema, bound= Schema)]) – the other table.
  • Returns
    Table – self updated with rows form other.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
1  | 10  | Alice | 30
12 | 12  | Tom   | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30
12  | Tom   | 40

update_types(**kwargs)

sourceUpdates types in schema. Has no effect on the runtime.

window_join(other, self_time, other_time, window, *on, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourcePerforms a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • how (JoinMode) – decides whether to run window_join_inner, window_join_left, window_join_right or window_join_outer. Default is INNER.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

window_join_inner(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_inner(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_inner(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2

window_join_left(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window left join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the left side that didn’t match with any record on the right side in a given window, are returned with missing values on the right side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_left(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t1.a, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_left(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t1.a, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |

window_join_outer(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window outer join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from both sides that didn’t match with any record on the other side in a given window, are returned with missing values on the other side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
1      |
2      | 2
3      | 2
7      | 6
7      | 7
13     |
t4 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      |
1      | 2
2      | 2
2      | 2
3      |
3      | 2
7      | 6
7      | 7
7      | 7
13     |
13     |
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_outer(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 1      |
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
1   | 13     |
2   | 1      |
2   | 2      | 2
2   | 2      | 3
3   | 4      |
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | -3
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
10     |
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_outer(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=pw.coalesce(t1.a, t2.b), left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   |        | 10
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
2   | 7      |
3   | 4      |
4   |        | 3

window_join_right(other, self_time, other_time, window, *on, left_instance=None, right_instance=None)

sourcePerforms a window right join of self with other using a window and join expressions. If two records belong to the same window and meet the conditions specified in the on clause, they will be joined. Note that if a sliding window is used and there are pairs of matching records that appear in more than one window, they will be included in the result multiple times (equal to the number of windows they appear in).

When using a session window, the function creates sessions by concatenating records from both sides of a join. Only pairs of records that meet the conditions specified in the on clause can be part of the same session. The result of a given session will include all records from the left side of a join that belong to this session, joined with all records from the right side of a join that belong to this session.

Rows from the right side that didn’t match with any record on the left side in a given window, are returned with missing values on the left side replaced with None. The multiplicity of such rows equals the number of windows they belong to and don’t have a match in them.

  • Parameters
    • other (Table) – the right side of a join.
    • self_time (ColumnExpression) – time expression in self.
    • other_time (ColumnExpression) – time expression in other.
    • window (Window) – a window to use.
    • on (ColumnExpression) – a list of column expressions. Each must have == on the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    WindowJoinResult – a result of the window join. A method .select() can be called on it to extract relevant columns from the result of a join.

Examples:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 1
  2 | 2
  3 | 3
  4 | 7
  5 | 13
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | t
  1 | 2
  2 | 5
  3 | 6
  4 | 7
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | 5
2      | 2
3      | 2
7      | 6
7      | 7
t4 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.sliding(1, 2)).select(
    left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t4, include_id=False)
left_t | right_t
       | 5
       | 5
       | 6
1      | 2
2      | 2
2      | 2
3      | 2
7      | 6
7      | 7
7      | 7
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 2
  3 | 1 | 3
  4 | 1 | 7
  5 | 1 | 13
  6 | 2 | 1
  7 | 2 | 2
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | 2
  2 | 1 | 5
  3 | 1 | 6
  4 | 1 | 7
  5 | 2 | 2
  6 | 2 | 3
  7 | 4 | 3
'''
)
t3 = t1.window_join_right(t2, t1.t, t2.t, pw.temporal.tumbling(2), t1.a == t2.b).select(
    key=t2.b, left_t=t1.t, right_t=t2.t
)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   |        | 5
1   | 2      | 2
1   | 3      | 2
1   | 7      | 6
1   | 7      | 7
2   | 2      | 2
2   | 2      | 3
4   |        | 3

t1 = pw.debug.table_from_markdown(
    '''
      | t
    0 | 0
    1 | 5
    2 | 10
    3 | 15
    4 | 17
'''
)
t2 = pw.debug.table_from_markdown(
    '''
      | t
    0 | -3
    1 | 2
    2 | 3
    3 | 6
    4 | 16
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2)
).select(left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
left_t | right_t
       | -3
0      | 2
0      | 3
0      | 6
5      | 2
5      | 3
5      | 6
15     | 16
17     | 16
t1 = pw.debug.table_from_markdown(
    '''
    | a | t
  1 | 1 | 1
  2 | 1 | 4
  3 | 1 | 7
  4 | 2 | 0
  5 | 2 | 3
  6 | 2 | 4
  7 | 2 | 7
  8 | 3 | 4
'''
)
t2 = pw.debug.table_from_markdown(
    '''
    | b | t
  1 | 1 | -1
  2 | 1 | 6
  3 | 2 | 2
  4 | 2 | 10
  5 | 4 | 3
'''
)
t3 = t1.window_join_right(
    t2, t1.t, t2.t, pw.temporal.session(predicate=lambda a, b: abs(a - b) <= 2), t1.a == t2.b
).select(key=t2.b, left_t=t1.t, right_t=t2.t)
pw.debug.compute_and_print(t3, include_id=False)
key | left_t | right_t
1   | 1      | -1
1   | 4      | 6
1   | 7      | 6
2   |        | 10
2   | 0      | 2
2   | 3      | 2
2   | 4      | 2
4   |        | 3

windowby(time_expr, *, window, behavior=None, instance=None)

sourceCreate a GroupedTable by windowing the table (based on expr and window), optionally with instance argument.

  • Parameters
    • time_expr (pw.ColumnExpression[int | float | datetime]) – Column expression used for windowing
    • window (Window) – type window to use
    • instance (ColumnExpression | None) – optional column expression to act as a shard key

Examples:

import pathway as pw
t = pw.debug.table_from_markdown(
'''
    | instance |  t |  v
1   | 0        |  1 |  10
2   | 0        |  2 |  1
3   | 0        |  4 |  3
4   | 0        |  8 |  2
5   | 0        |  9 |  4
6   | 0        |  10|  8
7   | 1        |  1 |  9
8   | 1        |  2 |  16
''')
result = t.windowby(
    t.t, window=pw.temporal.session(predicate=lambda a, b: abs(a-b) <= 1), instance=t.instance
).reduce(
pw.this.instance,
min_t=pw.reducers.min(pw.this.t),
max_v=pw.reducers.max(pw.this.v),
count=pw.reducers.count(),
)
pw.debug.compute_and_print(result, include_id=False)
instance | min_t | max_v | count
0        | 1     | 10    | 2
0        | 4     | 3     | 1
0        | 8     | 8     | 3
1        | 1     | 16    | 2

with_columns(*args, **kwargs)

sourceUpdates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | owner | pet | size
1 | Tom   | 1   | 10
2 | Bob   | 1   | 9
3 | Tom   | 2   | 8
''')
t3 = t1.with_columns(*t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet | size
8   | Tom   | 2   | 8
9   | Bob   | 1   | 9
10  | Tom   | 1   | 10

with_id(new_index)

sourceSet new ids based on another column containing id-typed values.

To generate ids based on arbitrary valued columns, use with_id_from.

Values assigned must be row-wise unique.

  • Parameters
    new_id – column to be used as the new index.
  • Returns
    Table with updated ids.

Example:

import pytest; pytest.xfail("with_id is hard to test")
import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | new_id
1 | 2
2 | 3
3 | 4
''')
t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id)
pw.debug.compute_and_print(t3)
    age  owner  pet
^2   10  Alice    1
^3    9    Bob    1
^4    8  Alice    2

with_id_from(*args, instance=None)

sourceCompute new ids based on values in columns. Ids computed from columns must be row-wise unique.

  • Parameters
    columns – columns to be used as primary keys.
  • Returns
    Table – self updated with recomputed ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   | age | owner  | pet
 1 | 10  | Alice  | 1
 2 | 9   | Bob    | 1
 3 | 8   | Alice  | 2
''')
t2 = t1 + t1.select(old_id=t1.id)
t3 = t2.with_id_from(t2.age)
pw.debug.compute_and_print(t3)
     | age | owner | pet | old_id
^... | 8   | Alice | 2   | ^...
^... | 9   | Bob   | 1   | ^...
^... | 10  | Alice | 1   | ^...
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id),
    same_as_new=(t3.id == t3.pointer_from(t3.age)))
pw.debug.compute_and_print(t4)
     | age | owner | pet | same_as_old | same_as_new
^... | 8   | Alice | 2   | False       | True
^... | 9   | Bob   | 1   | False       | True
^... | 10  | Alice | 1   | False       | True

with_prefix(prefix)

sourceRename columns by adding prefix to each name of column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.with_prefix("u_")
pw.debug.compute_and_print(t2, include_id=False)
u_age | u_owner | u_pet
8     | Alice   | 2
9     | Bob     | 1
10    | Alice   | 1

with_suffix(suffix)

sourceRename columns by adding suffix to each name of column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.with_suffix("_current")
pw.debug.compute_and_print(t2, include_id=False)
age_current | owner_current | pet_current
8           | Alice         | 2
9           | Bob           | 1
10          | Alice         | 1

with_universe_of(other)

sourceReturns a copy of self with exactly the same universe as others.

Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | age
1 | 10
7 | 3
8 | 100
''')
t3 = t2.filter(pw.this.age < 30).with_universe_of(t1)
t4 = t1 + t3
pw.debug.compute_and_print(t4, include_id=False)
pet | age
Cat | 3
Dog | 10

without(*columns)

sourceSelects all columns without named column references.

  • Parameters
    columns (str | ColumnReference) – columns to be dropped provided by table.column_name notation.
  • Returns
    Table – self without specified columns.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = t1.without(t1.age, pw.this.pet)
pw.debug.compute_and_print(t2, include_id=False)
owner
Alice
Alice
Bob

property id: ColumnReference

Get reference to pseudocolumn containing id’s of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.select(ids = t1.id)
t2.typehints()['ids']
<class 'pathway.engine.Pointer'>
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)
test
True
True
True
True

property schema: type[pathway.internals.schema.Schema]

Get schema of the table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.schema
<pathway.Schema types={'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}>
t1.typehints()['age']
<class 'int'>

property slice: TableSlice

Creates a collection of references to self columns. Supports basic column manipulation methods.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.slice.without("age")
TableSlice({'owner': <table1>.owner, 'pet': <table1>.pet})

class pw.TableLike(context)

[source]
Interface class for table-likes: Table, GroupedTable and JoinResult. All of those contain universe info, and thus support universe-related asserts.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
g1 = t1.groupby(t1.owner)
t2 = t1.filter(t1.age >= 9)
pw.debug.compute_and_print(t2, include_id=False)
age | owner | pet
9   | Bob   | dog
10  | Alice | dog
g2 = t2.groupby(t2.owner)
pw.universes.promise_is_subset_of(g2, g1) # t2 is a subset of t1, so this is safe

promise_universe_is_equal_to(other)

sourceAsserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    None

NOTE: The assertion works in place.

Example:

import pathway as pw
import pytest
t1 = pw.debug.table_from_markdown(
    '''
  | age | owner | pet
1 | 8   | Alice | cat
2 | 9   | Bob   | dog
3 | 15  | Alice | tortoise
4 | 99  | Bob   | seahorse
'''
).filter(pw.this.age<30)
t2 = pw.debug.table_from_markdown(
    '''
  | age | owner
1 | 11  | Alice
2 | 12  | Tom
3 | 7   | Eve
'''
)
t3 = t2.filter(pw.this.age > 10)
with pytest.raises(ValueError):
    t1.update_cells(t3)
t1 = t1.promise_universe_is_equal_to(t2)
result = t1.update_cells(t3)
pw.debug.compute_and_print(result, include_id=False)
age | owner | pet
11  | Alice | cat
12  | Tom   | dog
15  | Alice | tortoise

promise_universe_is_subset_of(other)

sourceAsserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 30

promise_universes_are_disjoint(other)

sourceAsserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)
age | owner | pet
8   | Alice | 2
9   | Bob   | 1
10  | Alice | 1
11  | Alice | 30
12  | Tom   | 40

class pw.TableSlice(mapping, table)

[source]
Collection of references to Table columns. Created by Table.slice method, or automatically by using left/right/this constructs. Supports basic column manipulation methods.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.slice.without("age").with_suffix("_col")
TableSlice({'owner_col': <table1>.owner, 'pet_col': <table1>.pet})

class pw.UDF(*, return_type=Ellipsis, deterministic=False, propagate_none=False, executor=AutoExecutor(), cache_strategy=None)

[source]
Base class for Pathway UDF (user-defined functions).

Please use the wrapper udf to create UDFs out of Python functions. Please subclass this class to define UDFs using Python classes. When subclassing this class, please implement the __wrapped__ function.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
a | b
1 | 2
3 | 4
5 | 6
'''
)

class VerySophisticatedUDF(pw.UDF):
    exponent: float
    def __init__(self, exponent: float) -> None:
        super().__init__()
        self.exponent = exponent
    def __wrapped__(self, a: int, b: int) -> float:
        intermediate = (a * b) ** self.exponent
        return round(intermediate, 2)

func = VerySophisticatedUDF(1.5)
res = table.select(result=func(table.a, table.b))
pw.debug.compute_and_print(res, include_id=False)
result
2.83
41.57
164.32

class pw.UDFAsync(*, capacity=None, retry_strategy=None, cache_strategy=None)

[source]
Deprecated. Subclass `UDF` instead.

UDFs that are executed as python async functions.

To implement your own UDF as a class please implement the __wrapped__ async function.

class pw.UDFSync(*, return_type=Ellipsis, deterministic=False, propagate_none=False, executor=AutoExecutor(), cache_strategy=None)

[source]
Deprecated. Subclass `UDF` instead.

UDFs that are executed as regular python functions.

To implement your own UDF as a class please implement the __wrapped__ function.

class pw.iterate_universe(table)

[source]

class pw.left

[source]
Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the left input table. For all other situations, you need pw.this object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select(
         age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size
     )
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9   | Bob        | L

class pw.right

[source]
Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the right input table. For all other situations, you need pw.this object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select(
         age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size
     )
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9   | Bob        | L

class pw.this

[source]
Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For most of the Table methods, it refers to self. For JoinResult, it refers to the left input table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.select(pw.this.owner, pw.this.age)
pw.debug.compute_and_print(t2, include_id=False)
owner | age
Alice | 8
Alice | 10
Bob   | 9

pw.apply(fun, *args, **kwargs)

sourceApplies function to column expressions, column-wise. Output column type deduced from type-annotations of a function.

Example:

import pathway as pw
def concat(left: str, right: str) -> str:
  return left+right
t1 = pw.debug.table_from_markdown('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = pw.apply(concat, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.apply_async(fun, *args, **kwargs)

sourceApplies function asynchronously to column expressions, column-wise. Output column type deduced from type-annotations of a function. Either a regular or async function can be passed.

Example:

import pathway as pw
import asyncio
async def concat(left: str, right: str) -> str:
  await asyncio.sleep(0.1)
  return left+right
t1 = pw.debug.table_from_markdown('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = pw.apply_async(concat, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.apply_with_type(fun, ret_type, *args, **kwargs)

sourceApplies function to column expressions, column-wise. Output column type is provided explicitly.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t2 = t1.select(col = pw.apply_with_type(lambda left, right: left+right, str, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.assert_table_has_schema(table, schema, *, allow_superset=True, ignore_primary_keys=True)

sourceAsserts that the schema of the table is equivalent to the schema given as an argument.

  • Parameters
    • table (Table) – Table for which we are asserting schema.
    • schema (type[Schema]) – Schema, which we assert that the Table has.
    • allow_superset (bool) – if True, the columns of the table can be a superset of columns in schema. The default value is True.
    • ignore_primary_keys (bool) – if True, the assert won’t check whether table and schema have the same primary keys. The default value is True.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.select(pw.this.owner, age = pw.cast(float, pw.this.age))
schema = pw.schema_builder(
    {"age": pw.column_definition(dtype=float), "owner": pw.column_definition(dtype=str)}
)
pw.assert_table_has_schema(t2, schema)

pw.attribute(func, **kwargs)

sourceDecorator for creation of attributes.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.attribute
        def attr(self) -> float:
            return self.arg*2

        @pw.output_attribute
        def ret(self) -> float:
            return self.attr + 1

t1 = pw.debug.table_from_markdown('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 15
8   | 17
9   | 19
10  | 21

pw.cast(target_type, col)

sourceChanges the type of the column to target_type and converts the data of this column

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  val
1   10
2    9
3    8
4    7''')
t1.schema
<pathway.Schema types={'val': <class 'int'>}>
pw.debug.compute_and_print(t1, include_id=False)
val
7
8
9
10
t2 = t1.select(val = pw.cast(float, t1.val))
t2.schema
<pathway.Schema types={'val': <class 'float'>}>
pw.debug.compute_and_print(t2, include_id=False)
val
7.0
8.0
9.0
10.0

pw.coalesce(*args)

sourceFor arguments list arg_1, arg_2, …, arg_n returns first not-None value.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
colA   colB
     |   10
   2 |
     |
   4 |    7''')
t2 = t1.select(t1.colA, t1.colB, col=pw.coalesce(t1.colA, t1.colB))
pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col
     |      |
     | 10   | 10
2    |      | 2
4    | 7    | 4

pw.column_definition(*, primary_key=False, default_value=undefined, dtype=None, name=None, append_only=None, description=None, example=None)

sourceCreates column definition

  • Parameters
    • primary_key (bool) – should column be a part of a primary key.
    • default_value (Any | None) – default value replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value.
    • dtype (Any | None) – data type. When used in schema class, will be deduced from the type annotation.
    • name (str | None) – name of a column. When used in schema class, will be deduced from the attribute name.
    • append_only (bool | None) – whether column is append-only. if unspecified, defaults to False or to value specified at the schema definition level
  • Returns
    Column definition.

Example:

import pathway as pw
class NewSchema(pw.Schema):
  key: int = pw.column_definition(primary_key=True)
  timestamp: str = pw.column_definition(name="@timestamp")
  data: str
NewSchema
<pathway.Schema types={'key': <class 'int'>, '@timestamp': <class 'str'>, 'data': <class 'str'>}>

pw.declare_type(target_type, col)

sourceUsed to change the type of a column to a particular type. Disclaimer: it only changes type in a schema, it does not affect values stored.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   val
1   10
2    9.5
3    8
4    7''')
t1.schema
<pathway.Schema types={'val': <class 'float'>}>
t2 = t1.filter(t1.val == pw.cast(int, t1.val))
t2.schema
<pathway.Schema types={'val': <class 'float'>}>
t3 = t2.select(val = pw.declare_type(int, t2.val))
t3.schema
<pathway.Schema types={'val': <class 'int'>}>

pw.fill_error(col, replacement)

sourceReplaces Error values with replacement. Only useful if program termination on error is disabled (PATHWAY_TERMINATE_ON_ERROR=0).

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    a | b
    3 | 3
    4 | 0
    5 | 5
    6 | 2
    '''
)
res_with_errors = t1.with_columns(c=pw.this.a // pw.this.b)
pw.debug.compute_and_print(res_with_errors, include_id=False, terminate_on_error=False)
a | b | c
3 | 3 | 1
4 | 0 | Error
5 | 5 | 1
6 | 2 | 3
res_wo_errors = res_with_errors.with_columns(c=pw.fill_error(pw.this.c, -1))
pw.debug.compute_and_print(res_wo_errors, include_id=False, terminate_on_error=False)
a | b | c
3 | 3 | 1
4 | 0 | -1
5 | 5 | 1
6 | 2 | 3

pw.groupby(grouped, *args, id=None, **kwargs)

sourceGroups join result by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • grouped (Table | JoinResult) – JoinResult to group by.
    • args – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
    • **kwargs – extra arguments, see respective documentation for Table.groupby and JoinResult.groupby
  • Returns
    Groupby object of GroupedJoinResult or GroupedTable type.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = pw.groupby(t1, t1.pet, t1.owner).reduce(
    t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age)
)
pw.debug.compute_and_print(t2, include_id=False)
owner | pet | ageagg
Alice | cat | 8
Alice | dog | 10
Bob   | dog | 16
t3 = pw.debug.table_from_markdown('''
   cost  owner  pet
1   100  Alice    1
2    90    Bob    1
3    80  Alice    2
''')
t4 = pw.debug.table_from_markdown('''
    cost  owner  pet size
11   100  Alice    3    M
12    90    Bob    1    L
13    80    Tom    1   XL
''')
join_result = t3.join(t4, t3.owner==t4.owner)
result = pw.groupby(join_result, pw.this.owner).reduce(
    pw.this.owner, pairs=pw.reducers.count()
)
pw.debug.compute_and_print(result, include_id=False)
owner | pairs
Alice | 2
Bob   | 1

pw.if_else(if_clause, then_clause, else_clause)

sourceEquivalent to:

if (if_clause):
    return (then_clause)
else:
    return (else_clause)

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
colA   colB
   1 |    0
   2 |    2
   6 |    3''')
t2 = t1.select(res = pw.if_else(t1.colB != 0, t1.colA // t1.colB, 0))
pw.debug.compute_and_print(t2, include_id=False)
res
0
1
2

pw.input_attribute(type=<class 'float'>)

sourceReturns new input_attribute. To be used inside class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.output_attribute
        def ret(self) -> float:
            return self.arg + 1

t1 = pw.debug.table_from_markdown('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pw.input_method(type=<class 'float'>)

sourceDecorator for defining input methods in class transformers.

Example:

import pathway as pw
@pw.transformer
class first_transformer:
    class table(pw.ClassArg):
        a: float = pw.input_attribute()

        @pw.method
        def fun(self, arg) -> int:
            return self.a * arg

@pw.transformer
class second_transformer:
    class table(pw.ClassArg):
        m = pw.input_method(int)

        @pw.output_attribute
        def val(self):
            return self.m(2)

t1 = pw.debug.table_from_markdown('''
age
10
9
8
7''')
t2 = first_transformer(table=t1.select(a=t1.age)).table
t2.schema
<pathway.Schema types={'fun': typing.Callable[..., int]}>
t3 = second_transformer(table=t2.select(m=t2.fun)).table
pw.debug.compute_and_print(t1 + t3, include_id=False)
age | val
7   | 14
8   | 16
9   | 18
10  | 20

pw.iterate(func, iteration_limit=None, **kwargs)

sourceIterate function until fixed point. Function has to take only Table arguments. Function has to return a single Table, a tuple of Tables, or a dict of Tables. Iterate returns the same shape of arguments as the func function: either a single Table, a tuple of Tables, or a dict of Tables, respectively. Initial arguments to function are passed through kwargs.

Example:

import pathway as pw
def collatz_transformer(iterated):
    @pw.udf(deterministic=True)
    def collatz_step(x: int) -> int:
        if x == 1:
            return 1
        elif x % 2 == 0:
            return x // 2
        else:
            return 3 * x + 1
    return iterated.select(val=collatz_step(iterated.val))
tab = pw.debug.table_from_markdown('''
val
  1
  2
  3
  4
  5
  6
  7
  8''')
ret = pw.iterate(collatz_transformer, iterated=tab)
pw.debug.compute_and_print(ret, include_id=False)
val
1
1
1
1
1
1
1
1

pw.join(left, right, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourceJoin self with other using the given join expression.

  • Parameters
    • left (Joinable) – the left side of the join, Table or JoinResult.
    • right (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = pw.join(
    t1, t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

pw.join_inner(left, right, *on, id=None, left_instance=None, right_instance=None)

sourceInner-joins two tables or join results.

  • Parameters
    • left (Joinable) – the left side of the join, Table or JoinResult.
    • right (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = pw.join_inner(t1, t2, t1.pet == t2.pet, t1.owner == t2.owner).select(
    age=t1.age, owner_name=t2.owner, size=t2.size
)
pw.debug.compute_and_print(t3, include_id = False)
age | owner_name | size
9   | Bob        | L

pw.join_left(left, right, *on, id=None, left_instance=None, right_instance=None)

sourceLeft-joins two tables or join results.

  • Parameters
    • self – the left side of the join, Table or JoinResult.
    • other – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(pw.join_left(t1, t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)
a  | t2_c | s
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

pw.join_outer(left, right, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • self – the left side of the join, Table or JoinResult.
    • other – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • instance – optional argument describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(pw.join_outer(t1, t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
13 |      |
13 |      |

pw.join_right(left, right, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • self – the left side of the join, Table or JoinResult.
    • other – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(pw.join_right(t1, t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
a  | t2_c | s
   | 14   |
   | 14   |
11 | 11   | 322
12 | 12   | 324
  • Returns
    OuterJoinResult object

pw.make_tuple(*args)

sourceCreates a tuple from the provided expressions.

  • Parameters
    args (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – a list of expressions to be put in a tuple
  • Returns
    tuple

NOTE: * Each cell in the output column will be a tuple containing the corresponding values from the input columns.

  • The order of values in each tuple will match the order of the input columns.
  • If any of the input columns have missing values, the resulting tuples will contain None for those positions.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
a | b  | c
1 | 10 | a
2 | 20 |
3 | 30 | c
'''
)
table_with_tuple = table.select(res=pw.make_tuple(pw.this.a, pw.this.b, pw.this.c))
pw.debug.compute_and_print(table_with_tuple, include_id=False)
res
(1, 10, 'a')
(2, 20, None)
(3, 30, 'c')

pw.method(func, **kwargs)

sourceDecorator for creation methods in class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        a: float = pw.input_attribute()

        @pw.output_attribute
        def b(self) -> float:
            return self.fun(self.a)

        @method
        def fun(self, arg) -> float:
            return self.a * arg

t1 = pw.debug.table_from_markdown('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(a=t1.age)).table
t2.schema
<pathway.Schema types={'b': <class 'float'>, 'fun': typing.Callable[..., float]}>
pw.debug.compute_and_print(t1 + t2.select(t2.b), include_id=False)
age | b
7   | 49
8   | 64
9   | 81
10  | 100
pw.debug.compute_and_print(t1 + t2.select(out = t2.fun(t2.b)), include_id=False)
age | out
7   | 343
8   | 512
9   | 729
10  | 1000

pw.numba_apply(fun, numba_signature, *args, **kwargs)

sourceApplies function to column expressions, column-wise. Function has to be numba compilable.

Currently only a few signatures are supported:

  • function has to be unary or binary
  • arguments and return type has to be either int64 or float64

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   val
1    1
2    3
3    5
4    7''')
t2 = t1.select(col = pw.numba_apply(lambda x: x*x-2*x+1, "int64(int64,)", t1.val))
pw.debug.compute_and_print(t2, include_id=False)
col
0
4
16
36

pw.output_attribute(func, **kwargs)

sourceDecorator for creation of output_attributes.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.output_attribute
        def ret(self) -> float:
            return self.arg + 1

t1 = pw.debug.table_from_markdown('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pw.pandas_transformer(output_schema, output_universe=None)

sourceDecorator that turns python function operating on pandas.DataFrame into pathway transformer.

Input universes are converted into input DataFrame indexes. The resulting index is treated as the output universe, so it must maintain uniqueness and be of integer type.

  • Parameters
    • output_schema (type[Schema]) – Schema of a resulting table.
    • output_universe (str | int | None) – Index or name of an argument whose universe will be used in resulting table. Defaults to None.
  • Returns
    Transformer that can be applied on Pathway tables.

Example:

import pathway as pw
input = pw.debug.table_from_markdown(
    '''
    | foo  | bar
0   | 10   | 100
1   | 20   | 200
2   | 30   | 300
'''
)
class Output(pw.Schema):
    sum: int
@pw.pandas_transformer(output_schema=Output)
def sum_cols(t: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame(t.sum(axis=1))
output = sum_cols(input)
pw.debug.compute_and_print(output, include_id=False)
sum
110
220
330

pw.require(val, *deps)

sourceReturns val iff every dep in deps is not-None. Returns None otherwise.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
colA   colB
     |   10
   2 |
     |
   4 |    7''')
t2 = t1.select(t1.colA, t1.colB, col=pw.require(t1.colA + t1.colB, t1.colA, t1.colB))
pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col
     |      |
     | 10   |
2    |      |
4    | 7    | 11

pw.run(*, debug=False, monitoring_level=MonitoringLevel.AUTO, with_http_server=False, default_logging=True, persistence_config=None, runtime_typechecking=None, license_key=None, terminate_on_error=None)

sourceRuns the computation graph.

  • Parameters
    • debug (bool) – enable output out of table.debug() operators
    • monitoring_level (MonitoringLevel) – the verbosity of stats monitoring mechanism. One of pathway.MonitoringLevel.NONE, pathway.MonitoringLevel.IN_OUT, pathway.MonitoringLevel.ALL. If unset, pathway will choose between NONE and IN_OUT based on output interactivity.
    • with_http_server (bool) – whether to start a http server with runtime metrics. Learn more in a tutorial .
    • default_logging (bool) – whether to allow pathway to set its own logging handler. Set it to False if you want to set your own logging handler.
    • persistence_config (Config | None) – the config for persisting the state in case this persistence is required.
    • runtime_typechecking (bool | None) – enables additional strict type checking at runtime
    • terminate_on_error (bool | None) – whether to terminate the computation if the data/user-logic error occurs

pw.run_all(*, debug=False, monitoring_level=MonitoringLevel.AUTO, with_http_server=False, default_logging=True, persistence_config=None, runtime_typechecking=None, license_key=None, terminate_on_error=None)

sourceRuns the computation graph with disabled tree-shaking optimization.

  • Parameters
    • debug (bool) – enable output out of table.debug() operators
    • monitoring_level (MonitoringLevel) – the verbosity of stats monitoring mechanism. One of pathway.MonitoringLevel.NONE, pathway.MonitoringLevel.IN_OUT, pathway.MonitoringLevel.ALL. If unset, pathway will choose between NONE and IN_OUT based on output interactivity.
    • with_http_server (bool) – whether to start a http server with runtime metrics. Learn more in a tutorial .
    • default_logging (bool) – whether to allow pathway to set its own logging handler. Set it to False if you want to set your own logging handler.
    • persistence_config (Config | None) – the config for persisting the state in case this persistence is required.
    • runtime_typechecking (bool | None) – enables additional strict type checking at runtime
    • terminate_on_error (bool | None) – whether to terminate the computation if the data/user-logic error occurs

pw.schema_builder(columns, *, name=None, properties=SchemaProperties(append_only=None))

sourceAllows to build schema inline, from a dictionary of column definitions.

  • Parameters
    • columns (dict[str, ColumnDefinition]) – dictionary of column definitions.
    • name (str | None) – schema name.
    • properties (SchemaProperties) – schema properties.
  • Returns
    Schema

Example:

import pathway as pw
pw.schema_builder(columns={
  'key': pw.column_definition(dtype=int, primary_key=True),
  'data': pw.column_definition(dtype=int, default_value=0)
}, name="my_schema")
<pathway.Schema types={'key': <class 'int'>, 'data': <class 'int'>}>

pw.schema_from_csv(path, *, name=None, properties=SchemaProperties(append_only=None), delimiter=',', quote='"', comment_character=None, escape=None, double_quote_escapes=True, num_parsed_rows=None)

sourceAllows to generate schema based on a CSV file. The names of the columns are taken from the header of the CSV file. Types of columns are inferred from the values, by checking if they can be parsed. Currently supported types are str, int and float.

  • Parameters
    • path (str) – path to the CSV file.
    • name (str | None) – schema name.
    • properties (SchemaProperties) – schema properties.
    • delimiter (str) – delimiter used in CSV file. Defaults to “,”.
    • quote (str) – quote character used in CSV file. Defaults to ‘”’.
    • comment_character (str | None) – character used in CSV file to denote comments. Defaults to None
    • escape (str | None) – escape character used in CSV file. Defaults to None.
    • double_quote_escapes (bool) – enable escapes of double quotes. Defaults to True.
    • num_parsed_rows (int | None) – number of rows, which will be parsed when inferring types. When set to None, all rows will be parsed. When set to 0, types of all columns will be set to str. Defaults to None.
  • Returns
    Schema

pw.schema_from_dict(columns, *, name=None, properties=SchemaProperties(append_only=None))

sourceAllows to build schema inline, from a dictionary of column definitions. Compared to pw.schema_builder, this one uses simpler structure of the dictionary, which allows it to be loaded from JSON file.

  • Parameters
    • columns (dict) – dictionary of column definitions. The keys in this dictionary are names of the columns, and the values are either:
      • type of the column
      • dictionary with keys: “dtype”, “primary_key”, “default_value” and values, respectively, type of the column, whether it is a primary key, and column’s default value. The type can be given both by python class, or string with class name - that is both int and “int” are accepted.
    • name (str | None) – schema name.
    • properties (dict | SchemaProperties) – schema properties, given either as instance of SchemaProperties class or a dict specifying arguments of SchemaProperties class.
  • Returns
    Schema

Example:

import pathway as pw
pw.schema_from_dict(columns={
  'key': {"dtype": "int", "primary_key": True},
  'data': {"dtype": "int", "default_value": 0}
}, name="my_schema")
<pathway.Schema types={'key': <class 'int'>, 'data': <class 'int'>}>

pw.schema_from_types(**kwargs)

sourceConstructs schema from kwargs: field=type.

Example:

import pathway as pw
s = pw.schema_from_types(foo=int, bar=str)
s
<pathway.Schema types={'foo': <class 'int'>, 'bar': <class 'str'>}>
issubclass(s, pw.Schema)
True

pw.sql(query, **kwargs)

sourceRun a SQL query on Pathway tables.

  • Parameters
    • query (str) – the SQL query to execute.
    • kwargs (Table) – the association name: table used for the execution of the SQL query. Each name:table pair links a Pathway table to a table name used in the SQL query.

Example:

import pathway as pw
t = pw.debug.table_from_markdown(
    """
      A  | B
      1  | 2
      4  | 3
      4  | 7
    """
)
ret = pw.sql("SELECT * FROM tab WHERE A<B", tab=t)
pw.debug.compute_and_print(ret, include_id=False)
A | B
1 | 2
4 | 7

Supported SQL keywords and operations: SELECT, WHERE, boolean expressions, arithmetic operations, GROUP BY, HAVING, AS (alias), UNION, INTERSECTION, JOIN, and WITH.

Table and column names are case-sensitive.

Specificities of Pathway:

  • id is a reserved key word for columns, every Pathway table has a special column id. This column is not captured by * expressions in SQL.
  • Order of columns might not be preserved with respect to SELECT query.
  • Pathway reducers (pw.count, pw.sum, etc.) aggregate over None values, while SQL aggregation functions (COUNT, SUM, etc.) skip NULL values.
  • UNION requires matching column names.
  • INTERSECT requires matching column names.

Limited support:

  • Subqueries are supported but fragile – they depend on a set of query rewriting routines from the sqlglot library.
  • Additionally, using the id column in subqueries is fragile.
  • LIKE, ANY, ALL, EXISTS are not supported, or only supported in a very weak state.

Unsupported operations:

  • ordering operations: ORDER BY, LIMIT, SELECT TOP
  • INSERT INTO (Pathway tables are immutable)
  • Pathway does not support anonymous columns: they might work but we do not guarantee their behavior.
  • INTERSECT does not support INTERSECT ALL.
  • COALESCE, IFNULL are not supported.
  • FULL JOIN and NATURAL JOIN are not supported.
  • CAST is not supported

pw.table_transformer(func=None, *, allow_superset=True, ignore_primary_keys=True, locals=None)

sourceMarks a function that performs operations on Tables.

As a consequence, arguments and return value, which are annotated to have type pw.Table[S] are checked whether they indeed have schema S.

  • Parameters
    • allow_superset (bool | Mapping[str, bool]) – if True, the columns of the table can be a superset of columns in schema. Can be given either as a bool, and this value is then used for all tables, or for each argument separately, by providing a dict whose keys are names of arguments, and values are bools specifying value of allow_superset for this argument. In the latter case to provide value for return value, provide value for key “return”. The default value is True.
    • ignore_primary_keys (bool | Mapping[str, bool]) – if True, the assert won’t check whether table and schema have the same primary keys. Can be given either as a bool, and this value is then used for all tables, or for each argument separately, by providing a dict whose keys are names of arguments, and values are bools specifying value of ignore_primary_keys for this argument. The default value is True.
    • locals (dict[str, Any] | None) – when Schema class, which is used as a parameter to pw.Table is defined locally, you need to pass locals() as locals argument.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
A | B
1 | 6
3 | 8
5 | 2
''')
schema = pw.schema_from_types(A=int, B=int)
result_schema = pw.schema_from_types(A=int, B=int, C=int)
@pw.table_transformer
def sum_columns(t: pw.Table[schema]) -> pw.Table[result_schema]:
    result = t.with_columns(C=pw.this.A + pw.this.B)
    return result
pw.debug.compute_and_print(sum_columns(t1), include_id=False)
A | B | C
1 | 6 | 7
3 | 8 | 11
5 | 2 | 7

pw.transformer(cls)

sourceDecorator that wraps the outer class when defining class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.output_attribute
        def ret(self) -> float:
            return self.arg + 1

t1 = pw.debug.table_from_markdown('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pw.udf(fun, /, *, return_type=Ellipsis, deterministic=False, propagate_none=False, executor=AutoExecutor(), cache_strategy=None)

sourceCreate a Python UDF (user-defined function) out of a callable.

Output column type deduced from type-annotations of a function. Can be applied to a regular or asynchronous function.

  • Parameters
    • return_type (Any) – The return type of the function. Can be passed here or as a return type annotation. Defaults to ..., meaning that the return type will be inferred from type annotation.
    • deterministic (bool) – Whether the provided function is deterministic. In this context, it means that the function always returns the same value for the same arguments. If it is not deterministic, Pathway will memoize the results until the row deletion. If your function is deterministic, you’re strongly encouraged to set it to True as it will improve the performance. Defaults to False, meaning that the function is not deterministic and its results will be kept.
    • executor (Executor) – Defines the executor of the UDF. It determines if the execution is synchronous or asynchronous. Defaults to AutoExecutor(), meaning that the execution strategy will be inferred from the function annotation. By default, if the function is a coroutine, then it is executed asynchronously. Otherwise it is executed synchronously.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. Defaults to None.

Example:

import pathway as pw
import asyncio
table = pw.debug.table_from_markdown(
    '''
age | owner | pet
 10 | Alice | dog
  9 |   Bob | dog
    | Alice | cat
  7 |   Bob | dog
'''
)

@pw.udf
def concat(left: str, right: str) -> str:
    return left + "-" + right

@pw.udf(propagate_none=True)
def increment(age: int) -> int:
    assert age is not None
    return age + 1

res1 = table.select(
    owner_with_pet=concat(table.owner, table.pet), new_age=increment(table.age)
)
pw.debug.compute_and_print(res1, include_id=False)
owner_with_pet | new_age
Alice-cat      |
Alice-dog      | 11
Bob-dog        | 8
Bob-dog        | 10

@pw.udf
async def sleeping_concat(left: str, right: str) -> str:
    await asyncio.sleep(0.1)
    return left + "-" + right

res2 = table.select(col=sleeping_concat(table.owner, table.pet))
pw.debug.compute_and_print(res2, include_id=False)
col
Alice-cat
Alice-dog
Bob-dog
Bob-dog

pw.udf_async(fun=None, *, capacity=None, retry_strategy=None, cache_strategy=None)

sourceDeprecated. Use udf instead.

Create a Python asynchronous UDF (user-defined function) out of a callable.

Output column type deduced from type-annotations of a function. Can be applied to a regular or asynchronous function.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations allowed. Defaults to None, indicating no specific limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. Defaults to None.

Example:

import pathway as pw
import asyncio
@pw.udf_async
async def concat(left: str, right: str) -> str:
  await asyncio.sleep(0.1)
  return left+right
t1 = pw.debug.table_from_markdown('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = concat(t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.unwrap(col)

sourceChanges the type of the column from Optional[T] to T. If there is any None in the column this operation will raise an exception.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
colA | colB
1    | 5
2    | 9
3    | None
4    | 15''')
t1.schema
<pathway.Schema types={'colA': <class 'int'>, 'colB': int | None}>
pw.debug.compute_and_print(t1, include_id=False)
colA | colB
1    | 5
2    | 9
3    |
4    | 15
t2 = t1.filter(t1.colA < 3)
t2.schema
<pathway.Schema types={'colA': <class 'int'>, 'colB': int | None}>
pw.debug.compute_and_print(t2, include_id=False)
colA | colB
1    | 5
2    | 9
t3 = t2.select(colB = pw.unwrap(t2.colB))
t3.schema
<pathway.Schema types={'colB': <class 'int'>}>
pw.debug.compute_and_print(t3, include_id=False)
colB
5
9

class pw.MonitoringLevel

[source]
Specifies a verbosity of Pathway monitoring mechanism.

AUTO = 0

Automatically sets IN_OUT in an interactive terminal and jupyter notebook. Sets NONE otherwise.

AUTO_ALL = 1

Automatically sets ALL in an interactive terminal and jupyter notebook. Sets NONE otherwise.

NONE = 2

No monitoring.

IN_OUT = 3

Monitor input connectors and input and output latency. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.

ALL = 4

Monitor input connectors and latency for each operator in the execution graph. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.

class pw.DateTimeNamespace(expression)

[source]
A module containing methods related to DateTimes. They can be called using a dt attribute of an expression.

Typical use:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |         t1
   1 | 2023-05-15T14:13:00
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S"))
table_with_days = table_with_datetime.select(day=table_with_datetime.t1.dt.day())

add_duration_in_timezone(duration, timezone)

sourceAdds Duration to DateTimeNaive taking into account the time zone.

  • Parameters
  • Returns
    DateTimeNaive

Example:

import pathway as pw
import datetime
t1 = pw.debug.table_from_markdown(
    '''
     |        date
   1 | 2023-03-26T01:23:00
   2 | 2023-03-27T01:23:00
   3 | 2023-10-29T01:23:00
   4 | 2023-10-30T01:23:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
t2 = t1.select(date=pw.this.date.dt.strptime(fmt=fmt))
t3 = t2.with_columns(
    new_date=pw.this.date.dt.add_duration_in_timezone(
        datetime.timedelta(hours=2), timezone="Europe/Warsaw"
    ),
)
pw.debug.compute_and_print(t3, include_id=False)
date                | new_date
2023-03-26 01:23:00 | 2023-03-26 04:23:00
2023-03-27 01:23:00 | 2023-03-27 03:23:00
2023-10-29 01:23:00 | 2023-10-29 02:23:00
2023-10-30 01:23:00 | 2023-10-30 03:23:00

day()

sourceExtracts day from a DateTime.

  • Returns
    Day as int. 1 <= day <= 31 (depending on a month)

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1974-03-12T00:00:00
   2 | 2023-03-25T12:00:00
   3 | 2023-05-15T14:13:00
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S"))
table_with_days = table_with_datetime.select(day=table_with_datetime.t1.dt.day())
pw.debug.compute_and_print(table_with_days, include_id=False)
day
12
15
25

days()

sourceThe total number of days in a Duration.

  • Returns
    Days as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |         t1          |         t2
   0 | 2023-03-15T00:00:00 | 2023-05-15T10:13:23
   1 | 2023-04-15T00:00:00 | 2023-05-15T10:00:00
   2 | 2023-05-01T10:00:00 | 2023-05-15T10:00:00
   3 | 2023-05-15T10:00:00 | 2023-05-15T09:00:00
   4 | 2023-05-15T10:00:00 | 2023-05-15T11:00:00
   5 | 2023-05-16T12:13:00 | 2023-05-15T10:00:00
   6 | 2024-05-15T14:13:23 | 2023-05-15T10:00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_days = table_with_diff.select(days=pw.this["diff"].dt.days())
pw.debug.compute_and_print(table_with_days, include_id=False)
days
-61
-30
-14
0
0
1
366

floor(duration)

sourceTruncates DateTime to precision specified by duration argument.

  • Parameters
    duration (ColumnExpression | Timedelta | str) – truncation precision

NOTE: Duration can be given as a string, in such case we accept aliases used by Pandas that represent a fixed duration, so e.g. “M” will not be accepted. For ambiguous frequencies, you can use other methods, e.g. column.dt.month() instead of column.dt.floor("1M").

  • Returns
    DateTimeNaive or DateTimeUtc depending on the type of an object the method was called on

Examples:

import pathway as pw
import datetime
t1 = pw.debug.table_from_markdown(
    '''
     |         date
   1 | 2023-05-15T12:23:12
   2 | 2023-05-15T12:33:21
   3 | 2023-05-15T13:20:35
   4 | 2023-05-15T13:51:41
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
t2 = t1.select(date=pw.this.date.dt.strptime(fmt=fmt))
res = t2.with_columns(
    truncated_to_hours=pw.this.date.dt.floor(datetime.timedelta(hours=1)),
    truncated_to_10_min=pw.this.date.dt.floor(datetime.timedelta(minutes=10)),
    truncated_to_15_s=pw.this.date.dt.floor(datetime.timedelta(seconds=15)),
)
pw.debug.compute_and_print(res, include_id=False)
date                | truncated_to_hours  | truncated_to_10_min | truncated_to_15_s
2023-05-15 12:23:12 | 2023-05-15 12:00:00 | 2023-05-15 12:20:00 | 2023-05-15 12:23:00
2023-05-15 12:33:21 | 2023-05-15 12:00:00 | 2023-05-15 12:30:00 | 2023-05-15 12:33:15
2023-05-15 13:20:35 | 2023-05-15 13:00:00 | 2023-05-15 13:20:00 | 2023-05-15 13:20:30
2023-05-15 13:51:41 | 2023-05-15 13:00:00 | 2023-05-15 13:50:00 | 2023-05-15 13:51:30

from_timestamp(unit)

sourceConverts timestamp represented as an int or float to DateTimeNaive.

  • Parameters
    unit (str) – unit of a timestamp. It has to be one of ‘s’, ‘ms’, ‘us’, ‘ns’.
  • Returns
    DateTimeNaive

Example:

import pathway as pw
timestamps_1 = pw.debug.table_from_markdown(
    '''
  | timestamp
1 |    10
2 | 1685969950
'''
)
datetimes_1 = timestamps_1.select(date=pw.this.timestamp.dt.from_timestamp(unit="s"))
pw.debug.compute_and_print(datetimes_1, include_id=False)
date
1970-01-01 00:00:10
2023-06-05 12:59:10

timestamps_2 = pw.debug.table_from_markdown(
    '''
  |   timestamp
1 |    10.123
2 | 1685969950.4567
'''
)
datetimes_2 = timestamps_2.select(date=pw.this.timestamp.dt.from_timestamp(unit="s"))
pw.debug.compute_and_print(datetimes_2, include_id=False)
date
1970-01-01 00:00:10.123000
2023-06-05 12:59:10.456700160

hour()

sourceExtracts hour from a DateTime.

  • Returns
    Hour as int. 0 <= hour < 24

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 2023-05-15T00:00:00
   2 | 2023-05-15T12:00:00
   3 | 2023-05-15T14:13:00
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S"))
table_with_hours = table_with_datetime.select(hour=table_with_datetime.t1.dt.hour())
pw.debug.compute_and_print(table_with_hours, include_id=False)
hour
0
12
14

hours()

sourceThe total number of hours in a Duration.

  • Returns
    Hours as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |         t1          |         t2
   0 | 2023-05-15T00:00:00 | 2023-05-15T10:13:23
   1 | 2023-05-15T00:00:00 | 2023-05-15T10:00:00
   2 | 2023-05-15T10:00:00 | 2023-05-15T10:00:00
   3 | 2023-05-15T10:00:23 | 2023-05-15T10:00:00
   4 | 2023-05-15T12:13:00 | 2023-05-15T10:00:00
   5 | 2023-05-15T14:13:23 | 2023-05-15T10:00:00
   6 | 2023-05-16T10:13:23 | 2023-05-15T10:00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_hours = table_with_diff.select(hours=pw.this["diff"].dt.hours())
pw.debug.compute_and_print(table_with_hours, include_id=False)
hours
-10
-10
0
0
2
4
24

microsecond()

sourceExtracts microseconds from a DateTime.

  • Returns
    Microsecond as int. 0 <= microsecond < 1_000_000

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.000012000
   3 | 2023-05-15T10:13:00.123456789
   4 | 2023-05-15T10:13:23.123456789
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S.%f"))
table_with_microseconds = table_with_datetime.select(
    microsecond=table_with_datetime.t1.dt.microsecond()
)
pw.debug.compute_and_print(table_with_microseconds, include_id=False)
microsecond
0
12
123456
123456

microseconds()

sourceThe total number of microseconds in a Duration.

  • Returns
    Microseconds as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1              |               t2
   0 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:23.123456789
   1 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.000012000 | 2023-05-15T10:13:00.000000000
   3 | 2023-05-15T10:13:00.123456789 | 2023-05-15T10:13:00.000000000
   4 | 2023-05-15T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
   5 | 2023-05-16T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
'''
)
fmt = "%Y-%m-%dT%H:%M:%S.%f"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_microseconds = table_with_diff.select(
    microseconds=pw.this["diff"].dt.microseconds()
)
pw.debug.compute_and_print(table_with_microseconds, include_id=False)
microseconds
-23123456
0
12
123456
23123456
86423123456

millisecond()

sourceExtracts milliseconds from a DateTime.

  • Returns
    Millisecond as int. 0 <= millisecond < 1_000

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.012000000
   3 | 2023-05-15T10:13:00.123456789
   4 | 2023-05-15T10:13:23.123456789
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S.%f"))
table_with_milliseconds = table_with_datetime.select(
    millisecond=table_with_datetime.t1.dt.millisecond()
)
pw.debug.compute_and_print(table_with_milliseconds, include_id=False)
millisecond
0
12
123
123

milliseconds()

sourceThe total number of milliseconds in a Duration.

  • Returns
    Milliseconds as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1              |               t2
   0 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:23.123456789
   1 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.012000000 | 2023-05-15T10:13:00.000000000
   3 | 2023-05-15T10:13:00.123456789 | 2023-05-15T10:13:00.000000000
   4 | 2023-05-15T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
   5 | 2023-05-16T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
'''
)
fmt = "%Y-%m-%dT%H:%M:%S.%f"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_milliseconds = table_with_diff.select(
    milliseconds=pw.this["diff"].dt.milliseconds()
)
pw.debug.compute_and_print(table_with_milliseconds, include_id=False)
milliseconds
-23123
0
12
123
23123
86423123

minute()

sourceExtracts minute from a DateTime.

  • Returns
    Minute as int. 0 <= minute < 60

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 2023-05-15T10:00:00
   2 | 2023-05-15T10:00:23
   3 | 2023-05-15T10:13:00
   4 | 2023-05-15T10:13:23
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S"))
table_with_minutes = table_with_datetime.select(
    minute=table_with_datetime.t1.dt.minute()
)
pw.debug.compute_and_print(table_with_minutes, include_id=False)
minute
0
0
13
13

minutes()

sourceThe total number of minutes in a Duration.

  • Returns
    Minutes as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |         t1          |         t2
   0 | 2023-05-15T10:00:00 | 2023-05-15T10:13:23
   1 | 2023-05-15T10:00:00 | 2023-05-15T10:00:00
   2 | 2023-05-15T10:00:23 | 2023-05-15T10:00:00
   3 | 2023-05-15T10:13:00 | 2023-05-15T10:00:00
   4 | 2023-05-15T10:13:23 | 2023-05-15T10:00:00
   5 | 2023-05-16T10:13:23 | 2023-05-15T10:00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_minutes = table_with_diff.select(minutes=pw.this["diff"].dt.minutes())
pw.debug.compute_and_print(table_with_minutes, include_id=False)
minutes
-13
0
0
13
13
1453

month()

sourceExtracts month from a DateTime.

  • Returns
    Month as int. 1 <= month <= 12

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1974-03-12T00:00:00
   2 | 2023-03-25T12:00:00
   3 | 2023-05-15T14:13:00
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S"))
table_with_months = table_with_datetime.select(month=table_with_datetime.t1.dt.month())
pw.debug.compute_and_print(table_with_months, include_id=False)
month
3
3
5

nanosecond()

sourceExtracts nanoseconds from a DateTime.

  • Returns
    Nanosecond as int. 0 <= nanosecond < 1_000_000_000

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.000000012
   3 | 2023-05-15T10:13:00.123456789
   4 | 2023-05-15T10:13:23.123456789
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S.%f"))
table_with_nanoseconds = table_with_datetime.select(
    nanosecond=table_with_datetime.t1.dt.nanosecond()
)
pw.debug.compute_and_print(table_with_nanoseconds, include_id=False)
nanosecond
0
12
123456789
123456789

nanoseconds()

sourceThe total number of nanoseconds in a Duration.

  • Returns
    Nanoseconds as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1              |               t2
   0 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:23.123456789
   1 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.000000012 | 2023-05-15T10:13:00.000000000
   3 | 2023-05-15T10:13:00.123456789 | 2023-05-15T10:13:00.000000000
   4 | 2023-05-15T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
   5 | 2023-05-16T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
'''
)
fmt = "%Y-%m-%dT%H:%M:%S.%f"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_nanoseconds = table_with_diff.select(
    nanoseconds=pw.this["diff"].dt.nanoseconds()
)
pw.debug.compute_and_print(table_with_nanoseconds, include_id=False)
nanoseconds
-23123456789
0
12
123456789
23123456789
86423123456789

round(duration)

sourceRounds DateTime to precision specified by duration argument.

NOTE: Duration can be given as a string, in such case we accept aliases used by Pandas that represent a fixed duration, so e.g. “M” will not be accepted. For ambiguous frequencies, you can use other methods, e.g. column.dt.month() instead of column.dt.floor("1M").

  • Returns
    DateTimeNaive or DateTimeUtc depending on the type of an object the method was called on

Examples:

import pathway as pw
import datetime
t1 = pw.debug.table_from_markdown(
    '''
     |         date
   1 | 2023-05-15T12:23:12
   2 | 2023-05-15T12:33:21
   3 | 2023-05-15T13:20:35
   4 | 2023-05-15T13:51:41
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
t2 = t1.select(date=pw.this.date.dt.strptime(fmt=fmt))
res = t2.with_columns(
    rounded_to_hours=pw.this.date.dt.round(datetime.timedelta(hours=1)),
    rounded_to_10_min=pw.this.date.dt.round(datetime.timedelta(minutes=10)),
    rounded_to_15_s=pw.this.date.dt.round(datetime.timedelta(seconds=15)),
)
pw.debug.compute_and_print(res, include_id=False)
date                | rounded_to_hours    | rounded_to_10_min   | rounded_to_15_s
2023-05-15 12:23:12 | 2023-05-15 12:00:00 | 2023-05-15 12:20:00 | 2023-05-15 12:23:15
2023-05-15 12:33:21 | 2023-05-15 13:00:00 | 2023-05-15 12:30:00 | 2023-05-15 12:33:15
2023-05-15 13:20:35 | 2023-05-15 13:00:00 | 2023-05-15 13:20:00 | 2023-05-15 13:20:30
2023-05-15 13:51:41 | 2023-05-15 14:00:00 | 2023-05-15 13:50:00 | 2023-05-15 13:51:45

second()

sourceExtracts seconds from a DateTime.

  • Returns
    Second as int. 0 <= second < 60

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.123456789
   3 | 2023-05-15T10:13:23.000000000
   4 | 2023-05-15T10:13:23.123456789
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S.%f"))
table_with_seconds = table_with_datetime.select(
    second=table_with_datetime.t1.dt.second()
)
pw.debug.compute_and_print(table_with_seconds, include_id=False)
second
0
0
23
23

seconds()

sourceThe total number of seconds in a Duration.

  • Returns
    Seconds as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1              |               t2
   0 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:23.123456789
   1 | 2023-05-15T10:13:00.000000000 | 2023-05-15T10:13:00.000000000
   2 | 2023-05-15T10:13:00.123456789 | 2023-05-15T10:13:00.000000000
   3 | 2023-05-15T10:13:23.000000000 | 2023-05-15T10:13:00.000000000
   4 | 2023-05-15T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
   5 | 2023-05-16T10:13:23.123456789 | 2023-05-15T10:13:00.000000000
'''
)
fmt = "%Y-%m-%dT%H:%M:%S.%f"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_seconds = table_with_diff.select(seconds=pw.this["diff"].dt.seconds())
pw.debug.compute_and_print(table_with_seconds, include_id=False)
seconds
-23
0
0
23
23
86423

strftime(fmt)

sourceConverts a DateTime to a string.

  • Parameters
    fmt (ColumnExpression | str) – Format string. We use the specifiers of chrono library. In most cases they are identical to standard python specifiers in strftime .
  • Returns
    str

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1970-02-03T10:13:00
   2 | 2023-03-25T10:13:00
   3 | 2023-03-26T12:13:00
   4 | 2023-05-15T14:13:23
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_with_datetime = table.select(t1=pw.this.t1.dt.strptime(fmt=fmt))
table_formatted = table_with_datetime.select(
    date=pw.this.t1.dt.strftime("%d.%m.%Y"),
    full_date=pw.this.t1.dt.strftime("%B %d, %Y"),
    time_24=pw.this.t1.dt.strftime("%H:%M:%S"),
    time_12=pw.this.t1.dt.strftime("%I:%M:%S %p"),
)
pw.debug.compute_and_print(table_formatted, include_id=False)
date       | full_date         | time_24  | time_12
03.02.1970 | February 03, 1970 | 10:13:00 | 10:13:00 AM
15.05.2023 | May 15, 2023      | 14:13:23 | 02:13:23 PM
25.03.2023 | March 25, 2023    | 10:13:00 | 10:13:00 AM
26.03.2023 | March 26, 2023    | 12:13:00 | 12:13:00 PM

strptime(fmt, contains_timezone=None)

sourceConverts a string to a DateTime. If the string contains a timezone and a %z specifier is used, timezone-aware DateTime is created. Then the timezone is converted to a server timezone (see examples). If the string contains no timezone, a naive (not aware of timezone) DateTime is created.

  • Parameters
    fmt (ColumnExpression | str) – Format string. We use the specifiers of chrono library. In most cases they are identical to standard python specifiers in strptime . contains_timezone: If fmt is not a single string (the same for all objects) but a ColumnExpression, you need to set this parameter so that the function can determine if the return type is DateTimeNaive (contains_timezone = False) or DateTimeUtc (contains_timezone = True).
  • Returns
    DateTimeNaive or DateTimeUtc

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1970-02-03T10:13:00.000000000
   2 | 2023-03-25T10:13:00.000000012
   3 | 2023-03-26T12:13:00.123456789
   4 | 2023-05-15T14:13:23.123456789
'''
)
fmt = "%Y-%m-%dT%H:%M:%S.%f"
table_with_datetime = table.select(t1=table.t1.dt.strptime(fmt=fmt))
pw.debug.compute_and_print(table_with_datetime, include_id=False)
t1
1970-02-03 10:13:00
2023-03-25 10:13:00.000000012
2023-03-26 12:13:00.123456789
2023-05-15 14:13:23.123456789

table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 03.02.1970T10:13:00.000000000
   2 | 25.03.2023T10:13:00.000000012
   3 | 26.03.2023T12:13:00.123456789
   4 | 15.05.2023T14:13:23.123456789
'''
)
fmt = "%d.%m.%YT%H:%M:%S.%f"
table_with_datetime = table.select(t1=table.t1.dt.strptime(fmt=fmt))
pw.debug.compute_and_print(table_with_datetime, include_id=False)
t1
1970-02-03 10:13:00
2023-03-25 10:13:00.000000012
2023-03-26 12:13:00.123456789
2023-05-15 14:13:23.123456789

table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1970-02-03T10:13:00-02:00
   2 | 2023-03-25T10:13:00+00:00
   3 | 2023-03-26T12:13:00-01:00
   4 | 2023-05-15T14:13:23+00:30
'''
)
fmt = "%Y-%m-%dT%H:%M:%S%z"
table_with_datetime = table.select(t1=table.t1.dt.strptime(fmt=fmt))
pw.debug.compute_and_print(table_with_datetime, include_id=False)
t1
1970-02-03 12:13:00+00:00
2023-03-25 10:13:00+00:00
2023-03-26 13:13:00+00:00
2023-05-15 13:43:23+00:00

subtract_date_time_in_timezone(date_time, timezone)

sourceSubtracts two DateTimeNaives taking into account the time zone.

  • Parameters
    • date_time (ColumnExpression | Timestamp) – DateTimeNaive to be subtracted from self.
    • timezone (ColumnExpression | str) – The time zone to perform subtraction in.
  • Returns
    Duration

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
     |        date1        |        date2
   1 | 2023-03-26T03:20:00 | 2023-03-26T01:20:00
   2 | 2023-03-27T03:20:00 | 2023-03-27T01:20:00
   3 | 2023-10-29T03:20:00 | 2023-10-29T01:20:00
   4 | 2023-10-30T03:20:00 | 2023-10-30T01:20:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
t2 = t1.select(
    date1=pw.this.date1.dt.strptime(fmt=fmt), date2=pw.this.date2.dt.strptime(fmt=fmt)
)
t3 = t2.with_columns(
    diff=pw.this.date1.dt.subtract_date_time_in_timezone(
        pw.this.date2, timezone="Europe/Warsaw"
    ),
)
pw.debug.compute_and_print(t3, include_id=False)
date1               | date2               | diff
2023-03-26 03:20:00 | 2023-03-26 01:20:00 | 0 days 01:00:00
2023-03-27 03:20:00 | 2023-03-27 01:20:00 | 0 days 02:00:00
2023-10-29 03:20:00 | 2023-10-29 01:20:00 | 0 days 03:00:00
2023-10-30 03:20:00 | 2023-10-30 01:20:00 | 0 days 02:00:00

subtract_duration_in_timezone(duration, timezone)

sourceSubtracts Duration from DateTimeNaive taking into account the time zone.

  • Parameters
    • duration (ColumnExpression | Timedelta) – Duration to be subtracted from DateTime.
    • timezone (ColumnExpression | str) – The time zone to perform subtraction in.
  • Returns
    DateTimeNaive

Example:

import pathway as pw
import datetime
t1 = pw.debug.table_from_markdown(
    '''
     |        date
   1 | 2023-03-26T03:23:00
   2 | 2023-03-27T03:23:00
   3 | 2023-10-29T03:23:00
   4 | 2023-10-30T03:23:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
t2 = t1.select(date=pw.this.date.dt.strptime(fmt=fmt))
t3 = t2.with_columns(
    new_date=pw.this.date.dt.subtract_duration_in_timezone(
        datetime.timedelta(hours=2), timezone="Europe/Warsaw"
    ),
)
pw.debug.compute_and_print(t3, include_id=False)
date                | new_date
2023-03-26 03:23:00 | 2023-03-26 00:23:00
2023-03-27 03:23:00 | 2023-03-27 01:23:00
2023-10-29 03:23:00 | 2023-10-29 02:23:00
2023-10-30 03:23:00 | 2023-10-30 01:23:00

timestamp(unit=None)

sourceReturns a number of (nano,micro,milli)seconds from 1970-01-01 for naive DateTime and from 1970-01-01 UTC for timezone-aware datetime.

  • Parameters
    unit (str | None) – unit of a timestamp. It has to be one of ‘s’, ‘ms’, ‘us’, ‘ns’. It can also be None and then it’ll default to ‘ns’ but this is deprecated.
  • Returns
    Timestamp as float. Timestamp as int if unit=None (deprecated).

Examples:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   0 | 1969-01-01T00:00:00.000000000
   1 | 1970-01-01T00:00:00.000000000
   2 | 2023-01-01T00:00:00.000000000
   3 | 2023-03-25T00:00:00.000000000
   4 | 2023-03-25T13:45:26.000000000
   5 | 2023-03-25T13:45:26.987654321
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S.%f"))
table_with_timestamp = table_with_datetime.select(
    timestamp_ns=table_with_datetime.t1.dt.timestamp(unit="ns"),
    timestamp_s=table_with_datetime.t1.dt.timestamp(unit="s"),
)
pw.debug.compute_and_print(table_with_timestamp, include_id=False)
timestamp_ns           | timestamp_s
-3.1536e+16            | -31536000.0
0.0                    | 0.0
1.6725312e+18          | 1672531200.0
1.6797024e+18          | 1679702400.0
1.679751926e+18        | 1679751926.0
1.6797519269876544e+18 | 1679751926.9876544

table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1969-01-01T00:00:00.000000000+00:00
   2 | 1970-01-01T00:00:00.000000000+00:00
   3 | 1970-01-01T00:00:00.000000000+02:00
   4 | 1970-01-01T00:00:00.000000000-03:00
   5 | 2023-01-01T00:00:00.000000000+01:00
   6 | 2023-03-25T00:00:00.000000000+01:00
   7 | 2023-03-25T13:45:26.000000000+01:00
   8 | 2023-03-25T13:45:26.987654321+01:00
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S.%f%z"))
table_with_timestamp = table_with_datetime.select(
    timestamp_ns=table_with_datetime.t1.dt.timestamp(unit="ns"),
    timestamp_s=table_with_datetime.t1.dt.timestamp(unit="s"),
)
pw.debug.compute_and_print(table_with_timestamp, include_id=False)
timestamp_ns           | timestamp_s
-3.1536e+16            | -31536000.0
-7200000000000.0       | -7200.0
0.0                    | 0.0
10800000000000.0       | 10800.0
1.6725276e+18          | 1672527600.0
1.6796988e+18          | 1679698800.0
1.679748326e+18        | 1679748326.0
1.6797483269876544e+18 | 1679748326.9876544

to_naive_in_timezone(timezone)

sourceConverts DateTimeUtc to time zone specified as timezone argument.

  • Parameters
    timezone (ColumnExpression | str) – The time zone to convert to.
  • Returns
    DateTimeNaive

Examples:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |        date_utc
   1 | 2023-03-26T00:59:00+00:00
   2 | 2023-03-26T01:00:00+00:00
   3 | 2023-03-27T00:59:00+00:00
   4 | 2023-03-27T01:00:00+00:00
   5 | 2023-10-28T23:59:00+00:00
   6 | 2023-10-29T00:00:00+00:00
   7 | 2023-10-29T00:30:00+00:00
   8 | 2023-10-29T01:00:00+00:00
   9 | 2023-10-29T01:30:00+00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S%z"
table_utc = table.select(date_utc=pw.this.date_utc.dt.strptime(fmt=fmt))
table_local = table_utc.with_columns(
    date=pw.this.date_utc.dt.to_naive_in_timezone(timezone="Europe/Warsaw"),
)
pw.debug.compute_and_print(table_local, include_id=False)
date_utc                  | date
2023-03-26 00:59:00+00:00 | 2023-03-26 01:59:00
2023-03-26 01:00:00+00:00 | 2023-03-26 03:00:00
2023-03-27 00:59:00+00:00 | 2023-03-27 02:59:00
2023-03-27 01:00:00+00:00 | 2023-03-27 03:00:00
2023-10-28 23:59:00+00:00 | 2023-10-29 01:59:00
2023-10-29 00:00:00+00:00 | 2023-10-29 02:00:00
2023-10-29 00:30:00+00:00 | 2023-10-29 02:30:00
2023-10-29 01:00:00+00:00 | 2023-10-29 02:00:00
2023-10-29 01:30:00+00:00 | 2023-10-29 02:30:00

table = pw.debug.table_from_markdown(
    '''
     |        date_utc
   1 | 2023-03-12T09:59:00+00:00
   2 | 2023-03-12T10:00:00+00:00
   3 | 2023-03-13T09:59:00+00:00
   4 | 2023-03-13T10:00:00+00:00
   5 | 2023-11-05T07:59:00+00:00
   6 | 2023-11-05T08:00:00+00:00
   7 | 2023-11-05T08:30:00+00:00
   8 | 2023-11-05T09:00:00+00:00
   9 | 2023-11-05T09:30:00+00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S%z"
table_utc = table.select(date_utc=pw.this.date_utc.dt.strptime(fmt=fmt))
table_local = table_utc.with_columns(
    date=pw.this.date_utc.dt.to_naive_in_timezone(timezone="America/Los_Angeles"),
)
pw.debug.compute_and_print(table_local, include_id=False)
date_utc                  | date
2023-03-12 09:59:00+00:00 | 2023-03-12 01:59:00
2023-03-12 10:00:00+00:00 | 2023-03-12 03:00:00
2023-03-13 09:59:00+00:00 | 2023-03-13 02:59:00
2023-03-13 10:00:00+00:00 | 2023-03-13 03:00:00
2023-11-05 07:59:00+00:00 | 2023-11-05 00:59:00
2023-11-05 08:00:00+00:00 | 2023-11-05 01:00:00
2023-11-05 08:30:00+00:00 | 2023-11-05 01:30:00
2023-11-05 09:00:00+00:00 | 2023-11-05 01:00:00
2023-11-05 09:30:00+00:00 | 2023-11-05 01:30:00

to_utc(from_timezone)

sourceConverts DateTimeNaive to UTC from time zone provided as from_timezone argument. If the given DateTime doesn’t exist in the provided time zone it is mapped to the first existing DateTime after it. If a given DateTime corresponds to more than one moments in the provided time zone, it is mapped to a later moment.

  • Parameters
    from_timezone (ColumnExpression | str) – The time zone to convert from.
  • Returns
    DateTimeUtc

Examples:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |         date
   1 | 2023-03-26T01:59:00
   2 | 2023-03-26T02:30:00
   3 | 2023-03-26T03:00:00
   4 | 2023-03-27T01:59:00
   5 | 2023-03-27T02:30:00
   6 | 2023-03-27T03:00:00
   7 | 2023-10-29T01:59:00
   8 | 2023-10-29T02:00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_local = table.select(date=pw.this.date.dt.strptime(fmt=fmt))
table_utc = table_local.with_columns(
    date_utc=pw.this.date.dt.to_utc(from_timezone="Europe/Warsaw"),
)
pw.debug.compute_and_print(table_utc, include_id=False)
date                | date_utc
2023-03-26 01:59:00 | 2023-03-26 00:59:00+00:00
2023-03-26 02:30:00 | 2023-03-26 01:00:00+00:00
2023-03-26 03:00:00 | 2023-03-26 01:00:00+00:00
2023-03-27 01:59:00 | 2023-03-26 23:59:00+00:00
2023-03-27 02:30:00 | 2023-03-27 00:30:00+00:00
2023-03-27 03:00:00 | 2023-03-27 01:00:00+00:00
2023-10-29 01:59:00 | 2023-10-28 23:59:00+00:00
2023-10-29 02:00:00 | 2023-10-29 01:00:00+00:00

table = pw.debug.table_from_markdown(
    '''
     |         date
   1 | 2023-03-12T01:59:00
   2 | 2023-03-12T02:30:00
   3 | 2023-03-12T03:00:00
   4 | 2023-03-13T01:59:00
   5 | 2023-03-13T02:30:00
   6 | 2023-03-13T03:00:00
   7 | 2023-11-05T00:59:00
   8 | 2023-11-05T01:00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_local = table.select(date=pw.this.date.dt.strptime(fmt=fmt))
table_utc = table_local.with_columns(
    date_utc=pw.this.date.dt.to_utc(from_timezone="America/Los_Angeles"),
)
pw.debug.compute_and_print(table_utc, include_id=False)
date                | date_utc
2023-03-12 01:59:00 | 2023-03-12 09:59:00+00:00
2023-03-12 02:30:00 | 2023-03-12 10:00:00+00:00
2023-03-12 03:00:00 | 2023-03-12 10:00:00+00:00
2023-03-13 01:59:00 | 2023-03-13 08:59:00+00:00
2023-03-13 02:30:00 | 2023-03-13 09:30:00+00:00
2023-03-13 03:00:00 | 2023-03-13 10:00:00+00:00
2023-11-05 00:59:00 | 2023-11-05 07:59:00+00:00
2023-11-05 01:00:00 | 2023-11-05 09:00:00+00:00

utc_from_timestamp(unit)

sourceConverts timestamp represented as an int or float to DateTimeUtc.

  • Parameters
    • timestamp – value to be converted to DateTime
    • unit (str) – unit of a timestamp. It has to be one of ‘s’, ‘ms’, ‘us’, ‘ns’
  • Returns
    DateTimeUtc

Example:

import pathway as pw
timestamps_1 = pw.debug.table_from_markdown(
    '''
  | timestamp
1 |    10
2 | 1685969950
'''
)
datetimes_1 = timestamps_1.select(date=pw.this.timestamp.dt.utc_from_timestamp(unit="s"))
pw.debug.compute_and_print(datetimes_1, include_id=False)
date
1970-01-01 00:00:10+00:00
2023-06-05 12:59:10+00:00

timestamps_2 = pw.debug.table_from_markdown(
    '''
  |   timestamp
1 |    10.123
2 | 1685969950.4567
'''
)
datetimes_2 = timestamps_2.select(date=pw.this.timestamp.dt.utc_from_timestamp(unit="s"))
pw.debug.compute_and_print(datetimes_2, include_id=False)
date
1970-01-01 00:00:10.123000+00:00
2023-06-05 12:59:10.456700160+00:00

weekday()

sourceConverts a DateTime to an int representing its day of the week, where 0 denotes a Monday, and 6 denotes a Sunday.

  • Returns
    int

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1970-02-03T10:13:00
   2 | 2023-03-25T10:13:00
   3 | 2023-03-26T12:13:00
   4 | 2023-05-15T14:13:23
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_with_datetime = table.select(t1=pw.this.t1.dt.strptime(fmt=fmt))
table_with_dayofweek = table_with_datetime.with_columns(weekday=pw.this.t1.dt.weekday())
pw.debug.compute_and_print(table_with_dayofweek, include_id=False)
t1                  | weekday
1970-02-03 10:13:00 | 1
2023-03-25 10:13:00 | 5
2023-03-26 12:13:00 | 6
2023-05-15 14:13:23 | 0

weeks()

sourceThe total number of weeks in a Duration.

  • Returns
    Weeks as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |         t1          |         t2
   0 | 2023-03-15T00:00:00 | 2023-05-15T10:13:23
   1 | 2023-04-15T00:00:00 | 2023-05-15T10:00:00
   2 | 2023-05-01T10:00:00 | 2023-05-15T10:00:00
   3 | 2023-05-15T10:00:00 | 2023-05-15T09:00:00
   4 | 2023-05-15T10:00:00 | 2023-05-15T11:00:00
   5 | 2023-05-16T12:13:00 | 2023-05-15T10:00:00
   6 | 2024-05-15T14:13:23 | 2023-05-15T10:00:00
'''
)
fmt = "%Y-%m-%dT%H:%M:%S"
table_with_datetimes = table.select(
    t1=pw.this.t1.dt.strptime(fmt=fmt), t2=pw.this.t2.dt.strptime(fmt=fmt)
)
table_with_diff = table_with_datetimes.select(diff=pw.this.t1 - pw.this.t2)
table_with_weeks = table_with_diff.select(weeks=pw.this["diff"].dt.weeks())
pw.debug.compute_and_print(table_with_weeks, include_id=False)
weeks
-8
-4
-2
0
0
0
52

year()

sourceExtracts year from a DateTime.

  • Returns
    Year as int.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     |               t1
   1 | 1974-03-12T00:00:00
   2 | 2023-03-25T12:00:00
   3 | 2023-05-15T14:13:00
'''
)
table_with_datetime = table.select(t1=table.t1.dt.strptime("%Y-%m-%dT%H:%M:%S"))
table_with_years = table_with_datetime.select(year=table_with_datetime.t1.dt.year())
pw.debug.compute_and_print(table_with_years, include_id=False)
year
1974
2023
2023

class pw.NumericalNamespace(expression)

[source]
A module containing methods related to numbers. They can be called using a num attribute of an expression.

Typical use:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | v
   1 | -1
'''
)
table_abs = table.select(v_abs=table.v.num.abs())

abs()

sourceReturns the absolute value from a numerical value.

  • Returns
    Absolute value as float

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | v
   1 | 1
   2 | -1
   3 | 2.5
   4 | -2.5
'''
)
table_abs = table.select(v_abs=table.v.num.abs())
pw.debug.compute_and_print(table_abs, include_id=False)
v_abs
1.0
1.0
2.5
2.5

fill_na(default_value)

sourceFill the missing values (None or NaN) in a column of a table with a specified default value.

  • Parameters
    default_value (float) – The value to fill in for the missing values.
  • Returns
    A new column with the missing values filled with the specified default value.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | v
   1 | 1
   2 | 2.0
   3 | None
   4 | 3.5
'''
)
table_fill_na = table.select(v_filled=table.v.num.fill_na(0))
pw.debug.compute_and_print(table_fill_na, include_id=False)
v_filled
0.0
1.0
2.0
3.5

round(decimals=0)

sourceRound the values in a column of a table to the specified number of decimals.

  • Parameters
    • decimals (ColumnExpression | int) – The number of decimal places to round to. It can be either an
    • 0. (integer or a reference to another column. Defaults to) –
  • Returns
    A new column with the values rounded to the specified number of decimals.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | v
   1 | -2.18
   2 | -1.11
   3 | 1
   4 | 2.1
   5 | 3.14
   6 | 4.17
'''
)
table_round = table.select(v_round=table.v.num.round(1))
pw.debug.compute_and_print(table_round, include_id=False)
v_round
-2.2
-1.1
1.0
2.1
3.1
4.2
import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | v      | precision
   1 | 3      | 0
   2 | 3.1    | 1
   3 | 3.14   | 1
   4 | 3.141  | 2
   5 | 3.1415 | 2
'''
)
table_round = table.select(v_round=table.v.num.round(pw.this.precision))
pw.debug.compute_and_print(table_round, include_id=False)
v_round
3.0
3.1
3.1
3.14
3.14

class pw.StringNamespace(expression)

[source]
A module containing methods related to string. They can be called using a str attribute of an expression.

Typical use:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | ALICE
'''
)
table += table.select(name_lower=table.name.str.lower())
pw.debug.compute_and_print(table, include_id=False)
name  | name_lower
ALICE | alice

count(sub, start=None, end=None)

sourceReturns the number of non-overlapping occurrences of substring sub in the range [start, end). Optional arguments start and end are interpreted as in slice notation.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Hello
   3 | World
   4 | Zoo
'''
)
table += table.select(count=table.name.str.count("o"))
pw.debug.compute_and_print(table, include_id=False)
name  | count
Alice | 0
Hello | 1
World | 1
Zoo   | 2

endswith(suffix)

sourceReturns True if the string ends with suffix.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(ends_with_e=table.name.str.endswith("e"))
pw.debug.compute_and_print(table, include_id=False)
name   | ends_with_e
Alice  | True
Bob    | False
CAROLE | False
david  | False

find(sub, start=None, end=None)

sourceReturn the lowest index in the string where substring sub is found within the slice s[start:end]. Optional arguments start and end are interpreted as in slice notation. Return -1 if sub is not found.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Hello
   3 | World
   4 | Zoo
'''
)
table += table.select(pos=table.name.str.find("o"))
pw.debug.compute_and_print(table, include_id=False)
name  | pos
Alice | -1
Hello | 4
World | 1
Zoo   | 1

len()

sourceReturns the length of a string.

  • Returns
    Length of the string

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(length=table.name.str.len())
pw.debug.compute_and_print(table, include_id=False)
name   | length
Alice  | 5
Bob    | 3
CAROLE | 6
david  | 5

lower()

sourceReturns a lowercase copy of a string.

  • Returns
    Lowercase string

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(name_lower=table.name.str.lower())
pw.debug.compute_and_print(table, include_id=False)
name   | name_lower
Alice  | alice
Bob    | bob
CAROLE | carole
david  | david

parse_bool(true_values=['on', 'true', 'yes', '1'], false_values=['off', 'false', 'no', '0'], optional=False)

sourceParses the string to bool, by checking if given string is either in true_values or false_values. The given string and all values in true_vales and false_values are made lowercase, so parsing is case insensitive.

When true_values and false_values arguments are not provided, strings “True”, “On”, “1” and “Yes” are interpreted as True value, and “False”, “Off”, “0”, and “No” are interpreted as False.

If true_values or false_values is provided, then these values are mapped to respectively True and False, while all other either raise an exception or return None, depending on argument optional.

If optional argument is set to True, then the return type is Optional[bool] and if some string cannot be parsed, None is returned.

Example:

import pathway as pw
import pandas as pd
df = pd.DataFrame({"a": ["0", "TRUE", "on"]}, dtype=str)
table = pw.debug.table_from_pandas(df)
table.typehints()
mappingproxy({'a': <class 'str'>})
pw.debug.compute_and_print(table, include_id=False)
a
0
TRUE
on
table = table.select(a=table.a.str.parse_bool())
table.typehints()
mappingproxy({'a': <class 'bool'>})
pw.debug.compute_and_print(table, include_id=False)
a
False
True
True

parse_float(optional=False)

sourceParses the string to float. If optional argument is set to True, then the return type is Optional[float] and if some string cannot be parsed, None is returned.

Example:

import pathway as pw
import pandas as pd
df = pd.DataFrame({"a": ["-5", "0.1", "200.999"]}, dtype=str)
table = pw.debug.table_from_pandas(df)
table.typehints()
mappingproxy({'a': <class 'str'>})
table = table.select(a=table.a.str.parse_float())
table.typehints()
mappingproxy({'a': <class 'float'>})
pw.debug.compute_and_print(table, include_id=False)
a
-5.0
0.1
200.999

parse_int(optional=False)

sourceParses the string to int. If optional argument is set to True, then the return type is Optional[int] and if some string cannot be parsed, None is returned.

Example:

import pathway as pw
import pandas as pd
df = pd.DataFrame({"a": ["-5", "0", "200"]}, dtype=str)
table = pw.debug.table_from_pandas(df)
table.typehints()
mappingproxy({'a': <class 'str'>})
table = table.select(a=table.a.str.parse_int())
table.typehints()
mappingproxy({'a': <class 'int'>})
pw.debug.compute_and_print(table, include_id=False)
a
-5
0
200

removeprefix(prefix, /)

sourceIf the string starts with prefix, returns a copy of the string without the prefix. Otherwise returns the original string.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(without_da=table.name.str.removeprefix("da"))
pw.debug.compute_and_print(table, include_id=False)
name   | without_da
Alice  | Alice
Bob    | Bob
CAROLE | CAROLE
david  | vid
table = pw.debug.table_from_markdown(
    '''
     | note | prefix
   1 | AAA  | A
   2 | BB   | B
'''
)
table = table.select(
   pw.this.note,
   new_note=pw.this.note.str.removeprefix(pw.this.prefix)
)
pw.debug.compute_and_print(table, include_id=False)
note | new_note
AAA  | AA
BB   | B

removesuffix(suffix, /)

sourceIf the string ends with suffix, returns a copy of the string without the suffix. Otherwise returns the original string.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(without_LE=table.name.str.removesuffix("LE"))
pw.debug.compute_and_print(table, include_id=False)
name   | without_LE
Alice  | Alice
Bob    | Bob
CAROLE | CARO
david  | david
table = pw.debug.table_from_markdown(
    '''
     | fruit  | suffix
   1 | bamboo | o
   2 | banana | na
'''
)
table = table.select(
   pw.this.fruit,
   fruit_cropped=pw.this.fruit.str.removesuffix(pw.this.suffix)
)
pw.debug.compute_and_print(table, include_id=False)
fruit  | fruit_cropped
bamboo | bambo
banana | bana

replace(old_value, new_value, count=-1, /)

sourceReturns the a string where the occurrences of the old_value substrings are

replaced by the new_value substring.
  • Parameters
    count (ColumnExpression | int) – Maximum number of occurrences to replace. When set to -1, replaces all occurrences. Defaults to -1.
  • Returns
    The new string where old_value is replaced by new_value

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
   5 | Edward
'''
)
table += table.select(name_replace=table.name.str.replace("d","Z"))
pw.debug.compute_and_print(table, include_id=False)
name   | name_replace
Alice  | Alice
Bob    | Bob
CAROLE | CAROLE
Edward | EZwarZ
david  | ZaviZ
table = pw.debug.table_from_markdown(
    '''
     | value      | old | new | count
   1 | Scaciscics | c   | t   | 3
   2 | yelliwwiid | i   | o   | 2
'''
)
table = table.select(
   pw.this.value,
   value_replace=pw.this.value.str.replace(
      pw.this.old, pw.this.new, pw.this.count
   )
)
pw.debug.compute_and_print(table, include_id=False)
value      | value_replace
Scaciscics | Statistics
yelliwwiid | yellowwoid

reversed()

sourceReturns a reverse copy of a string.

  • Returns
    Reverse string

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(name_reverse=table.name.str.reversed())
pw.debug.compute_and_print(table, include_id=False)
name   | name_reverse
Alice  | ecilA
Bob    | boB
CAROLE | ELORAC
david  | divad

rfind(sub, start=None, end=None)

sourceReturn the highest index in the string where substring sub is found within the slice s[start:end]. Optional arguments start and end are interpreted as in slice notation. Return -1 if sub is not found.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Hello
   3 | World
   4 | Zoo
'''
)
table += table.select(pos=table.name.str.rfind("o"))
pw.debug.compute_and_print(table, include_id=False)
name  | pos
Alice | -1
Hello | 4
World | 1
Zoo   | 2

slice(start, end, /)

sourceReturn a slice of the string.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(slice=table.name.str.slice(1,4))
pw.debug.compute_and_print(table, include_id=False)
name   | slice
Alice  | lic
Bob    | ob
CAROLE | ARO
david  | avi

startswith(prefix)

sourceReturns True if the string starts with prefix.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(starts_with_A=table.name.str.startswith("A"))
pw.debug.compute_and_print(table, include_id=False)
name   | starts_with_A
Alice  | True
Bob    | False
CAROLE | False
david  | False

strip(chars=None)

sourceReturns a copy of the string with specified leading and trailing characters removed. If no arguments are passed, remove the leading and trailing whitespaces.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(name_strip=table.name.str.strip("Aod"))
pw.debug.compute_and_print(table, include_id=False)
name   | name_strip
Alice  | lice
Bob    | Bob
CAROLE | CAROLE
david  | avi

swapcase()

sourceReturns a copy of the string where the case is inverted.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(name_swap=table.name.str.swapcase())
pw.debug.compute_and_print(table, include_id=False)
name   | name_swap
Alice  | aLICE
Bob    | bOB
CAROLE | carole
david  | DAVID

title()

sourceReturns a copy of the string where where words start with an uppercase character and the remaining characters are lowercase.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | col
   1 | title
'''
)
table = table.select(col_title=table["col"].str.title())
pw.debug.compute_and_print(table, include_id=False)
col_title
Title

upper()

sourceReturns a uppercase copy of a string.

  • Returns
    Uppercase string

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
     | name
   1 | Alice
   2 | Bob
   3 | CAROLE
   4 | david
'''
)
table += table.select(name_upper=table.name.str.upper())
pw.debug.compute_and_print(table, include_id=False)
name   | name_upper
Alice  | ALICE
Bob    | BOB
CAROLE | CAROLE
david  | DAVID