pw.Table

The Pathway programming framework is organized around work with data tables. This page contains reference for the Pathway Table class.

class pw.Table()

[source]
Collection of named columns over identical universes.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
isinstance(t1, pw.Table)

property C: ColumnNamespace

Returns the namespace of all the columns of a joinable. Allows accessing column names that might otherwise be a reserved methods.

import pathway as pw
tab = pw.debug.table_from_markdown('''
age | owner | pet | filter
10  | Alice | dog | True
9   | Bob   | dog | True
8   | Alice | cat | False
7   | Bob   | dog | True
''')
isinstance(tab.C.age, pw.ColumnReference)
pw.debug.compute_and_print(tab.filter(tab.C.filter), include_id=False)

cast_to_types(**kwargs)

sourceCasts columns to types.

concat(*others)

sourceConcats self with every other ∊ others.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

if self.id and other.id collide, throws an exception.

Requires:

  • other.columns == self.columns
  • self.id disjoint with other.id
  • Parameters
    other – the other table.
  • Returns
    Table – The concatenated table. Id’s of rows from original tables are preserved.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''')
pw.universes.promise_are_pairwise_disjoint(t1, t2)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)

concat_reindex(*tables)

sourceConcatenate contents of several tables.

This is similar to PySpark union. All tables must have the same schema. Each row is reindexed.

  • Parameters
    tables (Table) – List of tables to concatenate. All tables must have the same schema.
  • Returns
    Table – The concatenated table. It will have new, synthetic ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | pet
1 | Manul
8 | Octopus
''')
t3 = t1.concat_reindex(t2)
pw.debug.compute_and_print(t3, include_id=False)

copy()

sourceReturns a copy of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.copy()
pw.debug.compute_and_print(t2, include_id=False)
t1 is t2

deduplicate(*, value, instance=None, acceptor, persistent_id=None)

sourceDeduplicates rows in self on value column using acceptor function.

It keeps rows which where accepted by the acceptor function. Acceptor operates on two arguments - current value and the previously accepted value.

  • Parameters
    • value (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – column expression used for deduplication.
    • instance (ColumnExpression | None) – Grouping column. For rows with different values in this column, deduplication will be performed separately. Defaults to None.
    • acceptor (Callable[[TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]), TypeVar(T, bound= Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]])], bool]) – callback telling whether two values are different.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – the result of deduplication.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
    val | __time__
     1  |     2
     2  |     4
     3  |     6
     4  |     8
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(value=pw.this.val, acceptor=acceptor)
pw.debug.compute_and_print_update_stream(result, include_id=False)

table = pw.debug.table_from_markdown(
    '''
    val | instance | __time__
     1  |     1    |     2
     2  |     1    |     4
     3  |     2    |     6
     4  |     1    |     8
     4  |     2    |     8
     5  |     1    |    10
'''
)

def acceptor(new_value, old_value) -> bool:
    return new_value >= old_value + 2


result = table.deduplicate(
    value=pw.this.val, instance=pw.this.instance, acceptor=acceptor
)
pw.debug.compute_and_print_update_stream(result, include_id=False)

diff(timestamp, *values)

sourceCompute the difference between the values in the values columns and the previous values according to the order defined by the column timestamp.

  • Parameters
    • timestamp (-) – The column reference to the timestamp column on which the order is computed.
    • *values (-) – Variable-length argument representing the column references to the values columns.
  • Returns
    Table – A new table where each column is replaced with a new column containing the difference and whose name is the concatenation of diff_ and the former name.
  • Raises
    ValueError – If the columns are not ColumnReference.

NOTE: * The value of the “first” value (the row with the lower value in the timestamp column) is None.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values
1         | 1
2         | 2
3         | 4
4         | 7
5         | 11
6         | 16
''')
table += table.diff(pw.this.timestamp, pw.this.values)
pw.debug.compute_and_print(table, include_id=False)

difference(other)

sourceRestrict self universe to keys not appearing in the other table.

  • Parameters
    other (Table) – table with ids to remove from self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.difference(t2)
pw.debug.compute_and_print(t3, include_id=False)

empty()

sourceCreates an empty table with a schema specified by kwargs.

  • Parameters
    kwargs (DType) – Dict whose keys are column names and values are column types.
  • Returns
    Table – Created empty table.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
pw.debug.compute_and_print(t1, include_id=False)

filter(filter_expression)

sourceFilter a table according to filter_expression condition.

  • Parameters
    filter_expression (ColumnExpression) – ColumnExpression that specifies the filtering condition.
  • Returns
    Table – Result has the same schema as self and its ids are subset of self.id.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
filtered = vertices.filter(vertices.outdegree == 0)
pw.debug.compute_and_print(filtered, include_id=False)

flatten(to_flatten, *, origin_id=None)

sourcePerforms a flatmap operation on a column or expression given as a first argument. Datatype of this column or expression has to be iterable or Json array. Other columns of the table are duplicated as many times as the length of the iterable.

It is possible to get ids of source rows by passing origin_id argument, which is a new name of the column with the source ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet  |  age
1 | Dog  |   2
7 | Cat  |   5
''')
t2 = t1.flatten(t1.pet)
pw.debug.compute_and_print(t2, include_id=False)

from_columns(**kwargs)

sourceBuild a table from columns.

All columns must have the same ids. Columns’ names must be pairwise distinct.

Example:

import pathway as pw
t1 = pw.Table.empty(age=float, pet=float)
t2 = pw.Table.empty(foo=float, bar=float).with_universe_of(t1)
t3 = pw.Table.from_columns(t1.pet, qux=t2.foo)
pw.debug.compute_and_print(t3, include_id=False)

groupby(*args, id=None, sort_by=None, instance=None, )

sourceGroups table by columns from args.

NOTE: Usually followed by .reduce() that aggregates the result and returns a table.

  • Parameters
    • args (ColumnReference) – columns to group by.
    • id (ColumnReference | None) – if provided, is the column used to set id’s of the rows of the result
    • sort_by (ColumnReference | None) – if provided, column values are used as sorting keys for particular reducers
    • instance (ColumnReference | None) – optional argument describing partitioning of the data into separate instances
  • Returns
    GroupedTable – Groupby object.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.groupby(t1.pet, t1.owner).reduce(t1.owner, t1.pet, ageagg=pw.reducers.sum(t1.age))
pw.debug.compute_and_print(t2, include_id=False)

having(*indexers)

sourceRemoves rows so that indexed.ix(indexer) is possible when some rows are missing, for each indexer in indexers

property id: ColumnReference

Get reference to pseudocolumn containing id’s of a table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.select(ids = t1.id)
t2.typehints()['ids']
pw.debug.compute_and_print(t2.select(test=t2.id == t2.ids), include_id=False)

interpolate(timestamp, *values, mode=InterpolateMode.LINEAR)

sourceInterpolates missing values in a column using the previous and next values based on a timestamps column.

  • Parameters
    • timestamp (ColumnReference) – Reference to the column containing timestamps.
    • *values (ColumnReference) – References to the columns containing values to be interpolated.
    • mode (InterpolateMode, optional) – The interpolation mode. Currently, only InterpolateMode.LINEAR is supported. Default is InterpolateMode.LINEAR.
  • Returns
    Table – A new table with the interpolated values.
  • Raises
    ValueError – If the columns are not ColumnReference or if the interpolation mode is not supported.

NOTE: * The interpolation is performed based on linear interpolation between the previous and next values.

  • If a value is missing at the beginning or end of the column, no interpolation is performed.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
timestamp | values_a | values_b
1         | 1        | 10
2         |          |
3         | 3        |
4         |          |
5         |          |
6         | 6        | 60
''')
table = table.interpolate(pw.this.timestamp, pw.this.values_a, pw.this.values_b)
pw.debug.compute_and_print(table, include_id=False)

intersect(*tables)

sourceRestrict self universe to keys appearing in all of the tables.

  • Parameters
    tables (Table) – tables keys of which are used to restrict universe.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
  | cost
2 | 100
3 | 200
4 | 300
''')
t3 = t1.intersect(t2)
pw.debug.compute_and_print(t3, include_id=False)

ix(expression, *, optional=False, context=None)

sourceReindexes the table using expression values as keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError.

Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).

  • Returns
    Reindexed table with the same set of columns.

Example:

import pathway as pw
t_animals = pw.debug.table_from_markdown('''
  | epithet    | genus
1 | upupa      | epops
2 | acherontia | atropos
3 | bubo       | scandiacus
4 | dynastes   | hercules
''')
t_birds = pw.debug.table_from_markdown('''
  | desc
2 | hoopoe
4 | owl
''')
ret = t_birds.select(t_birds.desc, latin=t_animals.ix(t_birds.id).genus)
pw.debug.compute_and_print(ret, include_id=False)

ix_ref(*args, optional=False, context=None, instance=None)

sourceReindexes the table using expressions as primary keys. Uses keys from context, or tries to infer proper context from the expression. If optional is True, then None in expression values result in None values in the result columns. Missing values in table keys result in RuntimeError.

Context can be anything that allows for select or reduce, or pathway.this construct (latter results in returning a delayed operation, and should be only used when using ix inside join().select() or groupby().reduce() sequence).

  • Parameters
    args (Union[ColumnExpression, None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...]]) – Column references.
  • Returns
    Row – indexed row.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | dog
''')
t2 = t1.with_id_from(pw.this.name)
t2 = t2.select(*pw.this, new_value=pw.this.ix_ref("Alice").pet)
pw.debug.compute_and_print(t2, include_id=False)

Tables obtained by a groupby/reduce scheme always have primary keys:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.groupby(pw.this.pet).reduce(pw.this.pet, count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(t1.pet).count)
pw.debug.compute_and_print(t3, include_id=False)

Single-row tables can be accessed via ix_ref():

import pathway as pw
t1 = pw.debug.table_from_markdown('''
name   | pet
Alice  | dog
Bob    | cat
Carole | cat
David  | cat
''')
t2 = t1.reduce(count=pw.reducers.count())
t3 = t1.select(*pw.this, new_value=t2.ix_ref(context=t1).count)
pw.debug.compute_and_print(t3, include_id=False)

join(other, *on, id=None, how=JoinMode.INNER, left_instance=None, right_instance=None)

sourceJoin self with other using the given join expression.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • how (JoinMode) – by default, inner join is performed. Possible values are JoinMode.{INNER,LEFT,RIGHT,OUTER} correspond to inner, left, right and outer join respectively.
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(
    t2, t1.pet == t2.pet, t1.owner == t2.owner, how=pw.JoinMode.INNER
).select(age=t1.age, owner_name=t2.owner, size=t2.size)
pw.debug.compute_and_print(t3, include_id = False)

join_inner(other, *on, id=None, left_instance=None, right_instance=None)

sourceInner-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • on (ColumnExpression) – a list of column expressions. Each must have == as the top level operation and be of the form LHS: ColumnReference == RHS: ColumnReference.
    • id (ColumnReference | None) – optional argument for id of result, can be only self.id or other.id
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.table_from_markdown('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join_inner(t2, t1.pet == t2.pet, t1.owner == t2.owner).select(
    age=t1.age, owner_name=t2.owner, size=t2.size
)
pw.debug.compute_and_print(t3, include_id = False)

join_left(other, *on, id=None, left_instance=None, right_instance=None)

sourceLeft-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • rows from the right side that were not matched with the left side are skipped
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_left(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t2.id)),
include_id=False)

join_outer(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • instance – optional argument describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • for rows from the left side that were not matched with the right side, missing values on the right are replaced with None
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_outer(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(t1.b + t2.d, t1.id, t2.id)),
include_id=False)

join_right(other, *on, id=None, left_instance=None, right_instance=None)

sourceOuter-joins two tables or join results.

  • Parameters
    • other (Joinable) – the right side of the join, Table or JoinResult.
    • *on (ColumnExpression) – Columns to join, syntax self.col1 == other.col2
    • id (ColumnReference | None) – optional id column of the result
    • left_instance/right_instance – optional arguments describing partitioning of the data into separate instances

Remarks: args cannot contain id column from either of tables, as the result table has id column with auto-generated ids; it can be selected by assigning it to a column with defined name (passed in kwargs)

Behavior:

  • rows from the left side that were not matched with the right side are skipped
  • for rows from the right side that were not matched with the left side, missing values on the left are replaced with None
  • for rows that were matched the behavior is the same as that of an inner join.
  • Returns
    JoinResult – an object on which .select() may be called to extract relevant columns from the result of the join.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
        | a  | b
      1 | 11 | 111
      2 | 12 | 112
      3 | 13 | 113
      4 | 13 | 114
    '''
)
t2 = pw.debug.table_from_markdown(
    '''
        | c  | d
      1 | 11 | 211
      2 | 12 | 212
      3 | 14 | 213
      4 | 14 | 214
    '''
)
pw.debug.compute_and_print(t1.join_right(t2, t1.a == t2.c
).select(t1.a, t2_c=t2.c, s=pw.require(pw.coalesce(t1.b,0) + t2.d,t1.id)),
include_id=False)
  • Returns
    OuterJoinResult object

plot(plotting_function, sorting_col=None)

sourceAllows for plotting contents of the table visually in e.g. jupyter. If the table depends only on the bounded data sources, the plot will be generated right away. Otherwise (in streaming scenario), the plot will be auto-updating after running pw.run()

  • Parameters
    • self (pw.Table) – a table serving as a source of data
    • plotting_function (Callable[[ColumnDataSource], Plot]) – function for creating plot from ColumnDataSource
  • Returns
    pn.Column – visualization which can be displayed immediately or passed as a dashboard widget

Example:

import pathway as pw
from bokeh.plotting import figure
def func(source):
    plot = figure(height=400, width=400, title="CPU usage over time")
    plot.scatter('a', 'b', source=source, line_width=3, line_alpha=0.6)
    return plot
viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).plot(func)
type(viz)

pointer_from(*args, optional=False, instance=None)

sourcePseudo-random hash of its argument. Produces pointer types. Applied column-wise.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
g = t1.groupby(t1.owner).reduce(refcol = t1.pointer_from(t1.owner)) # g.id == g.refcol
pw.debug.compute_and_print(g.select(test = (g.id == g.refcol)), include_id=False)

promise_universe_is_equal_to(other)

sourceAsserts to Pathway that an universe of self is a subset of universe of each of the others.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    None

NOTE: The assertion works in place.

Example:

import pathway as pw
import pytest
t1 = pw.debug.table_from_markdown(
    '''
  | age | owner | pet
1 | 8   | Alice | cat
2 | 9   | Bob   | dog
3 | 15  | Alice | tortoise
4 | 99  | Bob   | seahorse
'''
).filter(pw.this.age<30)
t2 = pw.debug.table_from_markdown(
    '''
  | age | owner
1 | 11  | Alice
2 | 12  | Tom
3 | 7   | Eve
'''
)
t3 = t2.filter(pw.this.age > 10)
with pytest.raises(ValueError):
    t1.update_cells(t3)
t1 = t1.promise_universe_is_equal_to(t2)
result = t1.update_cells(t3)
pw.debug.compute_and_print(result, include_id=False)

promise_universe_is_subset_of(other)

sourceAsserts to Pathway that an universe of self is a subset of universe of each of the other.

Semantics: Used in situations where Pathway cannot deduce one universe being a subset of another.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 30
''').promise_universe_is_subset_of(t1)
t3 = t1 << t2
pw.debug.compute_and_print(t3, include_id=False)

promise_universes_are_disjoint(other)

sourceAsserts to Pathway that an universe of self is disjoint from universe of other.

Semantics: Used in situations where Pathway cannot deduce universes are disjoint.

  • Returns
    self

NOTE: The assertion works in place.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
11 | 11  | Alice | 30
12 | 12  | Tom   | 40
''').promise_universes_are_disjoint(t1)
t3 = t1.concat(t2)
pw.debug.compute_and_print(t3, include_id=False)

reduce(*args, **kwargs)

sourceReduce a table to a single row.

Equivalent to self.groupby().reduce(*args, **kwargs).

  • Parameters
    • args (ColumnReference) – reducer to reduce the table with
    • kwargs (ColumnExpression) – reducer to reduce the table with. Its key is the new name of a column.
  • Returns
    Table – Reduced table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.reduce(ageagg=pw.reducers.argmin(t1.age))
pw.debug.compute_and_print(t2, include_id=False)
t3 = t2.select(t1.ix(t2.ageagg).age, t1.ix(t2.ageagg).pet)
pw.debug.compute_and_print(t3, include_id=False)

remove_errors()

sourceFilters out rows that contain errors.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
    a | b
    3 | 3
    4 | 0
    5 | 5
    6 | 2
'''
)
t2 = t1.with_columns(x=pw.this.a // pw.this.b)
res = t2.remove_errors()
pw.debug.compute_and_print(res, include_id=False, terminate_on_error=False)

rename(names_mapping=None, **kwargs)

sourceRename columns according either a dictionary or kwargs.

If a mapping is provided using a dictionary, rename_by_dict will be used. Otherwise, rename_columns will be used with kwargs. Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    • names_mapping (dict[str | ColumnReference, str] | None) – mapping from old column names to new names.
    • kwargs (ColumnExpression) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

rename_by_dict(names_mapping)

sourceRename columns according to a dictionary.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    names_mapping (dict[str | ColumnReference, str]) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_by_dict({"age": "years_old", t1.pet: "animal"})
pw.debug.compute_and_print(t2, include_id=False)

rename_columns(**kwargs)

sourceRename columns according to kwargs.

Columns not in keys(kwargs) are not changed. New name of a column must not be id.

  • Parameters
    kwargs (str | ColumnReference) – mapping from old column names to new names.
  • Returns
    Table – self with columns renamed.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.rename_columns(years_old=t1.age, animal=t1.pet)
pw.debug.compute_and_print(t2, include_id=False)

restrict(other)

sourceRestrict self universe to keys appearing in other.

  • Parameters
    other (TableLike) – table which universe is used to restrict universe of self.
  • Returns
    Table – table with restricted universe, with the same set of columns

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | age  | owner  | pet
1 | 10   | Alice  | 1
2 | 9    | Bob    | 1
3 | 8    | Alice  | 2
'''
)
t2 = pw.debug.table_from_markdown(
    '''
  | cost
2 | 100
3 | 200
'''
)
t2.promise_universe_is_subset_of(t1)
t3 = t1.restrict(t2)
pw.debug.compute_and_print(t3, include_id=False)

property schema: type[pathway.internals.schema.Schema]

Get schema of the table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.schema
t1.typehints()['age']

select(*args, **kwargs)

sourceBuild a new table with columns specified by kwargs.

Output columns’ names are keys(kwargs). values(kwargs) can be raw values, boxed values, columns. Assigning to id reindexes the table.

  • Parameters
    • args (ColumnReference) – Column references.
    • kwargs (Any) – Column expressions with their new assigned names.
  • Returns
    Table – Created table.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
pet
Dog
Cat
''')
t2 = t1.select(animal=t1.pet, desc="fluffy")
pw.debug.compute_and_print(t2, include_id=False)

show(*, snapshot=True, include_id=True, short_pointers=True, sorters=None)

sourceAllows for displaying table visually in e.g. jupyter. If the table depends only on the bounded data sources, the table preview will be generated right away. Otherwise (in streaming scenario), the table will be auto-updating after running pw.run()

  • Parameters
    • self (pw.Table) – a table to be displayed
    • snapshot (bool, optional) – whether only current snapshot or all changes to the table should be displayed. Defaults to True.
    • include_id (bool, optional) – whether to show ids of rows. Defaults to True.
    • short_pointers (bool, optional) – whether to shorten printed ids. Defaults to True.
  • Returns
    pn.Column – visualization which can be displayed immediately or passed as a dashboard widget

Example:

import pathway as pw
table_viz = pw.debug.table_from_pandas(pd.DataFrame({"a":[1,2,3],"b":[3,1,2]})).show()
type(table_viz)

property slice: TableSlice

Creates a collection of references to self columns. Supports basic column manipulation methods.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.slice.without("age")

sort(key, instance=None)

sourceSorts a table by the specified keys.

  • Parameters
    • table – pw.Table The table to be sorted.
    • key (ColumnExpression[int | float | datetime | str | bytes]) – An expression to sort by.
    • instance (ColumnExpression | None) – ColumnReference or None An expression with instance. Rows are sorted within an instance. prev and next columns will only point to rows that have the same instance.
  • Returns
    pw.Table – The sorted table. Contains two columns: prev and next, containing the pointers to the previous and next rows.

Example:

import pathway as pw
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age)
pw.debug.compute_and_print(table, include_id=True)
table = pw.debug.table_from_markdown('''
name     | age | score
Alice    | 25  | 80
Bob      | 20  | 90
Charlie  | 30  | 80
David    | 35  | 90
Eve      | 15  | 80
''')
table = table.with_id_from(pw.this.name)
table += table.sort(key=pw.this.age, instance=pw.this.score)
pw.debug.compute_and_print(table, include_id=True)

split(split_expression)

sourceSplit a table according to split_expression condition.

  • Parameters
    split_expression (ColumnExpression) – ColumnExpression that specifies the split condition.
  • Returns
    positive_table, negative_table – tuple of tables, with the same schemas as self and with ids that are subsets of self.id, and provably disjoint.

Example:

import pathway as pw
vertices = pw.debug.table_from_markdown('''
label outdegree
    1         3
    7         0
''')
positive, negative = vertices.split(vertices.outdegree == 0)
pw.debug.compute_and_print(positive, include_id=False)
pw.debug.compute_and_print(negative, include_id=False)

typehints()

sourceReturn the types of the columns as a dictionary.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.typehints()

update_cells(other, )

sourceUpdates cells of self, breaking ties in favor of the values in other.

Semantics:

* result.columns == self.columns

* result.id == self.id

* conflicts are resolved preferring other’s values

Requires:

* other.columns ⊆ self.columns

* other.id ⊆ self.id
  • Parameters
    other (Table) – the other table.
  • Returns
    Table – self updated with cells form other.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
    age | owner | pet
1 | 10  | Alice | 30
''')
pw.universes.promise_is_subset_of(t2, t1)
t3 = t1.update_cells(t2)
pw.debug.compute_and_print(t3, include_id=False)

update_rows(other)

sourceUpdates rows of self, breaking ties in favor for the rows in other.

Semantics:

  • result.columns == self.columns == other.columns
  • result.id == self.id ∪ other.id

Requires:

  • other.columns == self.columns
  • Parameters
    other (Table[TypeVar(TSchema, bound= Schema)]) – the other table.
  • Returns
    Table – self updated with rows form other.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
   | age | owner | pet
1  | 10  | Alice | 30
12 | 12  | Tom   | 40
''')
t3 = t1.update_rows(t2)
pw.debug.compute_and_print(t3, include_id=False)

update_types(**kwargs)

sourceUpdates types in schema. Has no effect on the runtime.

with_columns(*args, **kwargs)

sourceUpdates columns of self, according to args and kwargs. See table.select specification for evaluation of args and kwargs.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | owner | pet | size
1 | Tom   | 1   | 10
2 | Bob   | 1   | 9
3 | Tom   | 2   | 8
''')
t3 = t1.with_columns(*t2)
pw.debug.compute_and_print(t3, include_id=False)

with_id(new_index)

sourceSet new ids based on another column containing id-typed values.

To generate ids based on arbitrary valued columns, use with_id_from.

Values assigned must be row-wise unique.

  • Parameters
    new_id – column to be used as the new index.
  • Returns
    Table with updated ids.

Example:

import pytest; pytest.xfail("with_id is hard to test")
import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | age | owner | pet
1 | 10  | Alice | 1
2 | 9   | Bob   | 1
3 | 8   | Alice | 2
''')
t2 = pw.debug.table_from_markdown('''
  | new_id
1 | 2
2 | 3
3 | 4
''')
t3 = t1.promise_universe_is_subset_of(t2).with_id(t2.new_id)
pw.debug.compute_and_print(t3)

with_id_from(*args, instance=None)

sourceCompute new ids based on values in columns. Ids computed from columns must be row-wise unique.

  • Parameters
    columns – columns to be used as primary keys.
  • Returns
    Table – self updated with recomputed ids.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
   | age | owner  | pet
 1 | 10  | Alice  | 1
 2 | 9   | Bob    | 1
 3 | 8   | Alice  | 2
''')
t2 = t1 + t1.select(old_id=t1.id)
t3 = t2.with_id_from(t2.age)
pw.debug.compute_and_print(t3)
t4 = t3.select(t3.age, t3.owner, t3.pet, same_as_old=(t3.id == t3.old_id),
    same_as_new=(t3.id == t3.pointer_from(t3.age)))
pw.debug.compute_and_print(t4)

with_prefix(prefix)

sourceRename columns by adding prefix to each name of column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.with_prefix("u_")
pw.debug.compute_and_print(t2, include_id=False)

with_suffix(suffix)

sourceRename columns by adding suffix to each name of column.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.with_suffix("_current")
pw.debug.compute_and_print(t2, include_id=False)

with_universe_of(other)

sourceReturns a copy of self with exactly the same universe as others.

Semantics: Required precondition self.universe == other.universe Used in situations where Pathway cannot deduce equality of universes, but those are equal as verified during runtime.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
  | pet
1 | Dog
7 | Cat
''')
t2 = pw.debug.table_from_markdown('''
  | age
1 | 10
7 | 3
8 | 100
''')
t3 = t2.filter(pw.this.age < 30).with_universe_of(t1)
t4 = t1 + t3
pw.debug.compute_and_print(t4, include_id=False)

without(*columns)

sourceSelects all columns without named column references.

  • Parameters
    columns (str | ColumnReference) – columns to be dropped provided by table.column_name notation.
  • Returns
    Table – self without specified columns.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = t1.without(t1.age, pw.this.pet)
pw.debug.compute_and_print(t2, include_id=False)