Pathway API

Reference for all the Pathway classes and functions.

See Table API for the main Table class.

class pw.AsyncTransformer(input_table)

Allows to perform async transformations on a table.

invoke() will be called asynchronously for each row of an input_table.

Output table can be acccesed via result.

Example:

import pathway as pw
import asyncio
class OutputSchema(pw.Schema):
   ret: int

class AsyncIncrementTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
    async def invoke(self, value) -> Dict[str, Any]:
        await asyncio.sleep(0.1)
        return {"ret": value + 1 }

input = pw.debug.parse_to_table('''
  | value
1 | 42
2 | 44
''')
result = AsyncIncrementTransformer(input_table=input).result
pw.debug.compute_and_print(result, include_id=False)
ret
43
45

close()

Called once at the end. Proper place for cleanup.

abstract async invoke(*args, **kwargs)

Called for every row of input_table. The arguments will correspond to the columns in the input table.

Should return dict of values matching output_schema.

open()

Called before actual work. Suitable for one time setup.

with_options(capacity=None, retry_strategy=None, cache_strategy=None)

Sets async options.
  • Parameters
    • capacity (Optional[int]) – maximum number of concurrent operations.
    • retry_strategy (Optional[AsyncRetryStrategy]) – defines how failures will be handled.
  • Returns
    self

property result(: Table )

Resulting table.

class pw.ClassArg(ref: RowReference, ptr: Pointer)

Base class to inherit from when writing inner classes for class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()
        @pw.output_attribute
        def ret(self) -> int:
            return self.arg + 1
t1 = pw.debug.parse_to_table('''
age
10
9
8
7
''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pointer_from(*args, optional=False)

Pseudo-random hash of its argument. Produces pointer types. Applied value-wise.

class pw.ColumnExpression()

get(index, default=None)

Extracts element at index from an object. The object has to be a Tuple. If no element is present at index, it returns value specified by a default parameter.
  • Parameters
    • index (Union[ColumnExpression, int]) – Position to extract element at.
    • default (Union[ColumnExpression, None, int, float, str, bytes, bool, BasePointer, datetime, timedelta, ndarray, Tuple[Union[None, int, float, str, bytes, bool, BasePointer, datetime, timedelta, ndarray, Tuple[Value, ...]], ...]]) – Value returned when no element is at position index.

Example:

import pathway as pw
t1 = pw.debug.table_from_markdown(
    '''
  | a | b | c
1 | 3 | 2 | 2
2 | 4 | 1 | 0
3 | 7 | 3 | 1
'''
)
t2 = t1.with_columns(tup=pw.make_tuple(pw.this.a, pw.this.b))
t3 = t2.select(
    x=pw.this.tup.get(1),
    y=pw.this.tup.get(3),
    z=pw.this.tup.get(pw.this.c),
    t=pw.this.tup.get(pw.this.c, default=100),
)
pw.debug.compute_and_print(t3, include_id=False)
x | y | z | t
1 |   | 4 | 4
2 |   |   | 100
3 |   | 3 | 3

to_string()

Changes the values to strings.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
val
1
2
3
4''')
t1.schema.as_dict()
{'val': INT}
pw.debug.compute_and_print(t1, include_id=False)
val
1
2
3
4
t2 = t1.select(val = pw.this.val.to_string())
t2.schema.as_dict()
{'val': STR}
pw.debug.compute_and_print(t2.select(val=pw.this.val + "a"), include_id=False)
val
1a
2a
3a
4a

class pw.ColumnReference(*, column, table, name)

Reference to the column.

Inherits from ColumnExpression.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
isinstance(t1.age, pw.ColumnReference)
True
isinstance(t1["owner"], pw.ColumnReference)
True

property name()

Name of the referred column.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.name
'age'

property table()

Table where the referred column belongs to.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.age.table is t1
True

class pw.MonitoringLevel(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Specifies a verbosity of Pathway monitoring mechanism.

ALL( = 4 )

Monitor input connectors and latency for each operator in the execution graph. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.

AUTO( = 0 )

Automatically sets IN_OUT in an interactive terminal and jupyter notebook. Sets NONE otherwise.

AUTO_ALL( = 1 )

Automatically sets ALL in an interactive terminal and jupyter notebook. Sets NONE otherwise.

IN_OUT( = 3 )

Monitor input connectors and input and output latency. The latency is measured as the difference between the time when the operator processed the data and the time when pathway acquired the data.

NONE( = 2 )

No monitoring.

class pw.Pointer()

Pointer to row type. Example: >>> import pathway as pw >>> t1 = pw.debug.parse_to_table(‘’’ … age | owner | pet … 10 | Alice | dog … 9 | Bob | dog … 8 | Alice | cat … 7 | Bob | dog … ‘’’) >>> t2 = t1.select(col=t1.id) >>> t2.schema\[‘col’\] Pointer

class pw.Schema()

Base class to inherit from when creating schemas. All schemas should be subclasses of this one.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t1.schema.as_dict()
{'age': INT, 'owner': STR, 'pet': STR}
issubclass(t1.schema, pw.Schema)
True
class NewSchema(pw.Schema):
  foo: int
SchemaSum = NewSchema | t1.schema
SchemaSum.as_dict()
{'age': INT, 'owner': STR, 'pet': STR, 'foo': INT}

class pw.SchemaProperties(append_only=False)

class pw.TableSlice(mapping, table)

Collection of references to Table columns. Created by Table.slice method, or automatically by using left/right/this constructs. Supports basic column manipulation methods.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t1.slice.without("age").with_suffix("_col")
TableSlice({'owner_col': <table1>.owner, 'pet_col': <table1>.pet})

class pw.iterate_universe(table)

class pw.left(*args, **kwargs)

Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the left input table. For all other situations, you need pw.this object.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.parse_to_table('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select(
         age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size
     )
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9   | Bob        | L

class pw.right(*args, **kwargs)

Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For Table.join() and JoinResult.select(), refers to the right input table. For all other situations, you need pw.this object.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age  | owner  | pet
 10  | Alice  | 1
  9  | Bob    | 1
  8  | Alice  | 2
''')
t2 = pw.debug.parse_to_table('''
age  | owner  | pet | size
 10  | Alice  | 3   | M
 9   | Bob    | 1   | L
 8   | Tom    | 1   | XL
''')
t3 = t1.join(t2, pw.left.pet == pw.right.pet, pw.left.owner == pw.right.owner).select(
         age=pw.left.age, owner_name=pw.right.owner, size=pw.this.size
     )
pw.debug.compute_and_print(t3, include_id=False)
age | owner_name | size
9   | Bob        | L

class pw.this(*args, **kwargs)

Object for generating column references without holding the actual table in hand. Needs to be evaluated in the proper context. For most of the Table methods, it refers to self. For JoinResult, it refers to the left input table.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | 1
9   | Bob   | 1
8   | Alice | 2
''')
t2 = t1.select(pw.this.owner, pw.this.age)
pw.debug.compute_and_print(t2, include_id=False)
owner | age
Alice | 8
Alice | 10
Bob   | 9

Functions

pw.apply(fun, *args, **kwargs)

Applies function to column expressions, column-wise. Output column type deduced from type-annotations of a function.

Example:

import pathway as pw
def concat(left: str, right: str) -> str:
  return left+right
t1 = pw.debug.parse_to_table('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = pw.apply(concat, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.apply_async(fun, *args, **kwargs)

Applies function asynchronously to column expressions, column-wise. Output column type deduced from type-annotations of a function. Either a regular or async function can be passed.

Example:

import pathway as pw
import asyncio
async def concat(left: str, right: str) -> str:
  await asyncio.sleep(0.1)
  return left+right
t1 = pw.debug.parse_to_table('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = pw.apply_async(concat, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.apply_with_type(fun, ret_type, *args, **kwargs)

Applies function to column expressions, column-wise. Output column type is provided explicitly.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   age  owner  pet
1   10  Alice  dog
2    9    Bob  dog
3    8  Alice  cat
4    7    Bob  dog''')
t2 = t1.select(col = pw.apply_with_type(lambda left, right: left+right, str, t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.assert_table_has_schema(table, schema, *, allow_superset=False, ignore_primary_keys=True)

Asserts that the schema of the table is equivalent to the schema given as an argument.
  • Parameters
    • schema. (table - Table for which we are asserting) –
    • Schema (schema -) –
    • has. (which we assert that the Table) –
    • True (ignore_primary_keys - if) – in schema.
    • columns (the columns* of *the table can be a superset of) – in schema.
    • True – have the same primary keys. The default value is True.
    • schema (the assert won't check whether table and) – have the same primary keys. The default value is True.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
age | owner | pet
10  | Alice | dog
9   | Bob   | dog
8   | Alice | cat
7   | Bob   | dog
''')
t2 = t1.select(pw.this.owner, age = pw.cast(float, pw.this.age))
schema = pw.schema_builder(
    {"age": pw.column_definition(dtype=float), "owner": pw.column_definition(dtype=str)}
)
pw.assert_table_has_schema(t2, schema)

pw.attribute(func, **kwargs)

Decorator for creation of attributes.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.attribute
        def attr(self) -> float:
            return self.arg*2

        @pw.output_attribute
        def ret(self) -> float:
            return self.attr + 1

t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 15
8   | 17
9   | 19
10  | 21

pw.cast(target_type, col)

Changes the type of the column to target_type and converts the data of this column

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
  val
1   10
2    9
3    8
4    7''')
t1.schema.as_dict()
{'val': INT}
pw.debug.compute_and_print(t1, include_id=False)
val
7
8
9
10
t2 = t1.select(val = pw.cast(float, t1.val))
t2.schema.as_dict()
{'val': FLOAT}
pw.debug.compute_and_print(t2, include_id=False)
val
7.0
8.0
9.0
10.0

pw.coalesce(*args)

For arguments list arg_1, arg_2, …, arg_n returns first not-None value.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA   colB
     |   10
   2 |
     |
   4 |    7''')
t2 = t1.select(t1.colA, t1.colB, col=pw.coalesce(t1.colA, t1.colB))
pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col
     |      |
     | 10   | 10
2    |      | 2
4    | 7    | 4

pw.column_definition(*, primary_key=False, default_value=undefined, dtype=None, name=None)

Creates column definition
  • Parameters
    • primary_key (bool) – should column be a part of a primary key.
    • default_value (Optional[Any]) – default valuee replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value.
    • dtype (Optional[Any]) – data type. When used in schema class, will be deduced from the type annotation.
    • name (Optional[str]) – name of a column. When used in schema class, will be deduced from the attribute name.
  • Returns
    Column definition.

Example:

import pathway as pw
class NewSchema(pw.Schema):
  key: int = pw.column_definition(primary_key=True)
  timestamp: str = pw.column_definition(name="@timestamp")
  data: str
NewSchema.as_dict()
{'key': INT, '@timestamp': STR, 'data': STR}

pw.declare_type(target_type, col)

Used to change the type of a column to a particular type. Disclaimer: it only changes type in a schema, it does not affect values stored.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   val
1   10
2    9.5
3    8
4    7''')
t1.schema.as_dict()
{'val': FLOAT}
t2 = t1.filter(t1.val == pw.cast(int, t1.val))
t2.schema.as_dict()
{'val': FLOAT}
t3 = t2.select(val = pw.declare_type(int, t2.val))
t3.schema.as_dict()
{'val': INT}

pw.if_else(if_clause, then_clause, else_clause)

Equivalent to:
if (if_clause):
    return (then_clause)
else:
    return (else_clause)

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA   colB
   1 |    0
   2 |    2
   6 |    3''')
t2 = t1.select(res = pw.if_else(t1.colB != 0, t1.colA // t1.colB, 0))
pw.debug.compute_and_print(t2, include_id=False)
res
0
1
2

pw.input_attribute(type=<class 'float'>)

Returns new input_attribute. To be used inside class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.output_attribute
        def ret(self) -> float:
            return self.arg + 1

t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pw.input_method(type=<class 'float'>)

Decorator for defining input methods in class transformers.

Example:

import pathway as pw
@pw.transformer
class first_transformer:
    class table(pw.ClassArg):
        a: float = pw.input_attribute()

        @pw.method
        def fun(self, arg) -> int:
            return self.a * arg

@pw.transformer
class second_transformer:
    class table(pw.ClassArg):
        m = pw.input_method(int)

        @pw.output_attribute
        def val(self):
            return self.m(2)

t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = first_transformer(table=t1.select(a=t1.age)).table
t2.schema.as_dict()
{'fun': Callable(..., INT)}
t3 = second_transformer(table=t2.select(m=t2.fun)).table
pw.debug.compute_and_print(t1 + t3, include_id=False)
age | val
7   | 14
8   | 16
9   | 18
10  | 20

pw.iterate(func, iteration_limit=None, **kwargs)

Iterate function until fixed point. Function has to take only named arguments, Tables, and return a dict of Tables. Initial arguments to function are passed through kwargs.

Example:

import pathway as pw
def collatz_transformer(iterated):
    def collatz_step(x: int) -> int:
        if x == 1:
            return 1
        elif x % 2 == 0:
            return x / 2
        else:
            return 3 * x + 1
    new_iterated = iterated.select(val=pw.apply(collatz_step, iterated.val))
    return dict(iterated=new_iterated)
tab = pw.debug.parse_to_table('''
val
  1
  2
  3
  4
  5
  6
  7
  8''')
ret = pw.iterate(collatz_transformer, iterated=tab).iterated
pw.debug.compute_and_print(ret, include_id=False)
val
1
1
1
1
1
1
1
1

pw.make_tuple(*args)

Creates a tuple from the provided expressions.
  • Parameters
    args (Union[ColumnExpression, None, int, float, str, bytes, bool, BasePointer, datetime, timedelta, ndarray, Tuple[Union[None, int, float, str, bytes, bool, BasePointer, datetime, timedelta, ndarray, Tuple[Value, ...]], ...]]) – a list of expressions to be put in a tuple
  • Returns
    tuple

NOTE: * Each cell in the output column will be a tuple containing the corresponding values from the input columns.

  • The order of values in each tuple will match the order of the input columns.
  • If any of the input columns have missing values, the resulting tuples will contain None for those positions.

Example:

import pathway as pw
table = pw.debug.table_from_markdown(
    '''
A | B  | C
1 | 10 | a
2 | 20 |
3 | 30 | c
'''
)
table_with_tuple = table.select(res=pw.make_tuple(pw.this.A, pw.this.B, pw.this.C))
pw.debug.compute_and_print(table_with_tuple, include_id=False)
res
(1, 10, 'a')
(2, 20, None)
(3, 30, 'c')

pw.method(func, **kwargs)

Decorator for creation methods in class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        a: float = pw.input_attribute()

        @pw.output_attribute
        def b(self) -> float:
            return self.fun(self.a)

        @method
        def fun(self, arg) -> float:
            return self.a * arg

t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(a=t1.age)).table
t2.schema.as_dict()
{'b': FLOAT, 'fun': Callable(..., FLOAT)}
pw.debug.compute_and_print(t1 + t2.select(t2.b), include_id=False)
age | b
7   | 49
8   | 64
9   | 81
10  | 100
pw.debug.compute_and_print(t1 + t2.select(out = t2.fun(t2.b)), include_id=False)
age | out
7   | 343
8   | 512
9   | 729
10  | 1000

pw.numba_apply(fun, numba_signature, *args, **kwargs)

Applies function to column expressions, column-wise. Function has to be numba compilable.

Currently only a few signatures are supported:

  • function has to be unary or binary
  • arguments and return type has to be either int64 or float64

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
   val
1    1
2    3
3    5
4    7''')
t2 = t1.select(col = pw.numba_apply(lambda x: x*x-2*x+1, "int64(int64,)", t1.val))
pw.debug.compute_and_print(t2, include_id=False)
col
0
4
16
36

pw.output_attribute(func, **kwargs)

Decorator for creation of output_attributes.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.output_attribute
        def ret(self) -> float:
            return self.arg + 1

t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pw.pandas_transformer(output_schema, output_universe=None)

Decorator that turns python function operating on pandas.DataFrame into pathway transformer.

Input universes are converted into input DataFrame indexes. The resulting index is treated as the output universe, so it must maintain uniqueness and be of integer type.

  • Parameters
    • output_schema (Type[Schema]) – Schema of a resulting table.
    • output_universe (Union[int, str, None]) – Index or name of an argument whose universe will be used in resulting table. Defaults to None.
  • Returns
    Transformer that can be applied on Pathway tables.

Example:

import pathway as pw
input = pw.debug.table_from_markdown(
    '''
    | foo  | bar
0   | 10   | 100
1   | 20   | 200
2   | 30   | 300
'''
)
class Output(pw.Schema):
    sum: int
@pw.pandas_transformer(output_schema=Output)
def sum_cols(t: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame(t.sum(axis=1))
output = sum_cols(input)
pw.debug.compute_and_print(output, include_id=False)
sum
110
220
330

pw.require(val, *deps)

Returns val iff every dep in deps is not-None. Returns None otherwise.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA   colB
     |   10
   2 |
     |
   4 |    7''')
t2 = t1.select(t1.colA, t1.colB, col=pw.require(t1.colA + t1.colB, t1.colA, t1.colB))
pw.debug.compute_and_print(t2, include_id=False)
colA | colB | col
     |      |
     | 10   |
2    |      |
4    | 7    | 11

pw.run(debug=False, monitoring_level=MonitoringLevel.AUTO, with_http_server=False, default_logging=True, persistence_config=None)

Runs the computation graph.
  • Parameters
    • debug (bool) – enable output out of table.debug() operators
    • monitoring_level (MonitoringLevel) – the verbosity of stats monitoring mechanism. One of pathway.MonitoringLevel.NONE, pathway.MonitoringLevel.IN_OUT, pathway.MonitoringLevel.ALL. If unset, pathway will choose between NONE and IN_OUT based on output interactivity.
    • with_http_server (bool) – whether to start a http server with runtime metrics. Learn more in a tutorial .
    • default_logging (bool) – whether to allow pathway to set its own logging handler. Set it to False if you want to set your own logging handler.

pw.schema_builder(columns, *, name=None, properties=SchemaProperties(append_only=False))

Allows to build schema inline, from a dictionary of column definitions.
  • Parameters
    • columns (Dict[str, ColumnDefinition]) – dictionary of column definitions.
    • name (Optional[str]) – schema name.
    • properties (SchemaProperties) – schema properties.
  • Returns
    Schema

Example:

import pathway as pw
schema = pw.schema_builder(columns={
  'key': pw.column_definition(dtype=int, primary_key=True),
  'data': pw.column_definition(dtype=int, default_value=0)
}, name="my_schema")
schema.as_dict()
{'key': INT, 'data': INT}

pw.schema_from_types(_name=None, **kwargs)

Constructs schema from kwargs: field=type.

Example:

import pathway as pw
s = pw.schema_from_types(foo=int, bar=str)
s.as_dict()
{'foo': INT, 'bar': STR}
issubclass(s, pw.Schema)
True

pw.sql(query, **kwargs)

Run a SQL query on Pathway tables.
  • Parameters
    • query (str) – the SQL query to execute.
    • kwargs (Table) – the association name: table used for the execution of the SQL query. Each name:table pair links a Pathway table to a table name used in the SQL query.

Example:

import pathway as pw
t = pw.debug.table_from_markdown(
    """
      A  | B
      1  | 2
      4  | 3
      4  | 7
    """
)
ret = pw.sql("SELECT * FROM tab WHERE A<B", tab=t)
pw.debug.compute_and_print(ret, include_id=False)
A | B
1 | 2
4 | 7

Supported SQL keywords and operations: SELECT, WHERE, boolean expressions, arithmetic operations, GROUP BY, HAVING, AS (alias), UNION, INTERSECTION, JOIN, and WITH.

Table and column names are case-sensitive.

Specificities of Pathway:

  • id is a reserved key word for columns, every Pathway table has a special column id. This column is not captured by * expressions in SQL.
  • Order of columns might not be preserved with respect to SELECT query.
  • Pathway reducers (pw.count, pw.sum, etc.) aggregate over None values, while SQL aggregation functions (COUNT, SUM, etc.) skip NULL values.
  • UNION requires matching column names.
  • INTERSECT requires matching column names.

Limited support:

  • Subqueries are supported but fragile – they depend on a set of query rewriting routines from the sqlglot library.
  • Additionally, using the id column in subqueries is fragile.
  • LIKE, ANY, ALL, EXISTS are not supported, or only supported in a very weak state.

Unsupported operations:

  • ordering operations: ORDER BY, LIMIT, SELECT TOP
  • INSERT INTO (Pathway tables are immutable)
  • Pathway does not support anonymous columns: they might work but we do not guarantee their behavior.
  • INTERSECT does not support INTERSECT ALL.
  • COALESCE, IFNULL are not supported.
  • FULL JOIN and NATURAL JOIN are not supported.
  • CAST is not supported

pw.transformer(cls)

Decorator that wraps the outer class when defining class transformers.

Example:

import pathway as pw
@pw.transformer
class simple_transformer:
    class table(pw.ClassArg):
        arg = pw.input_attribute()

        @pw.output_attribute
        def ret(self) -> float:
            return self.arg + 1

t1 = pw.debug.parse_to_table('''
age
10
9
8
7''')
t2 = simple_transformer(table=t1.select(arg=t1.age)).table
pw.debug.compute_and_print(t1 + t2, include_id=False)
age | ret
7   | 8
8   | 9
9   | 10
10  | 11

pw.udf(fun)

Create a Python UDF (universal data function) out of a callable.

The output type of the UDF is determined based on its type annotation.

Example:

import pathway as pw
@pw.udf
def concat(left: str, right: str) -> str:
    return left+right

t1 = pw.debug.parse_to_table('''
age  owner  pet
    10  Alice  dog
    9    Bob  dog
    8  Alice  cat
    7    Bob  dog''')
t2 = t1.select(col = concat(t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.udf_async(fun=None, *, capacity=None, retry_strategy=None, cache_strategy=None)

Create a Python asynchronous UDF (universal data function) out of a callable.

Output column type deduced from type-annotations of a function. Can be applied to a regular or asynchronous function.

Example:

import pathway as pw
import asyncio
@pw.udf_async
async def concat(left: str, right: str) -> str:
  await asyncio.sleep(0.1)
  return left+right
t1 = pw.debug.parse_to_table('''
age  owner  pet
 10  Alice  dog
  9    Bob  dog
  8  Alice  cat
  7    Bob  dog''')
t2 = t1.select(col = concat(t1.owner, t1.pet))
pw.debug.compute_and_print(t2, include_id=False)
col
Alicecat
Alicedog
Bobdog
Bobdog

pw.unwrap(col)

Changes the type of the column from Optional\[T\] to T. If there is any None in the column this operation will raise an exception.

Example:

import pathway as pw
t1 = pw.debug.parse_to_table('''
colA | colB
1    | 5
2    | 9
3    | None
4    | 15''')
t1.schema.as_dict()
{'colA': INT, 'colB': Optional(INT)}
pw.debug.compute_and_print(t1, include_id=False)
colA | colB
1    | 5
2    | 9
3    |
4    | 15
t2 = t1.filter(t1.colA < 3)
t2.schema.as_dict()
{'colA': INT, 'colB': Optional(INT)}
pw.debug.compute_and_print(t2, include_id=False)
colA | colB
1    | 5
2    | 9
t3 = t2.select(colB = pw.unwrap(t2.colB))
t3.schema.as_dict()
{'colB': INT}
pw.debug.compute_and_print(t3, include_id=False)
colB
5
9