Basic operations

Must-read for first-timers and veterans alike, this guide gathers the most commonly used basic elements of Pathway.

While the Pathway programming framework comes with advanced functionalities such as classifiers or fuzzy-joins, it is essential to master the basic operations at the core of the framework. As part of this basic operations guide, we are going to walk through the following topics:

If you want more information you can review our complete API docs or some of our tutorials.

Streaming and static modes

The first thing to keep in mind is that Pathway is made for streaming data. In this streaming mode, Pathway assumes unbounded incoming updates. To process the incoming data, Pathway maintains a dataflow. This mode requires input connectors listening to streaming data sources. The computation runs indefinitely until the process is killed: the computation starts when pw.run is called and everything afterwards is unreachable code.

However, the streaming mode may not be the most convenient when testing or debugging. For that purpose, Pathway provides a static mode in which static data may be attached to the connectors. In that mode, finite and static data can be loaded, e.g. from a table written in a static CSV file or from a markdown table.

When the computation is run in the static mode, all the data is loaded and processed at once. While the static mode does not fully benefit from the dataflow, it allows checking if the graph is correctly built. To ease the debugging, Pathway provides a function called compute_and_print. When calling pw.debug.compute_and_print(t), Pathway builds the whole graph, ingests all the available static data, prints the obtained table t, and then discards the data. Calling twice compute_and_print will result in ingesting the whole data twice. In the streaming mode, the building of the graph and the ingestion of the data is done only once when pw.run() is called and the data is maintained in the graph until the computation is over: results cannot be printed but should be accessed using output connectors.

The processing pipeline should be designed in the same way no matter what mode is used in the end. The only difference is how the data is considered.

To learn more about both modes, you can read our article about it.

In most of our examples, we use the static mode since it avoids setting external streaming data sources such as Kafka.

Starting with data

Be sure to always import Pathway.

import pathway as pw

Now you need tables to manipulate. The way tables are obtained depends on whether you are on streaming or static mode. In this article, you will be using the static mode to be able to show the behavior of each processing step, but keep in mind that Pathway is made for the streaming mode.

Static mode

In the static mode, you can manually enter a table using a markdown connector. Here are the (static) tables you will be using:

t_name = pw.debug.table_from_markdown(
    """
    | name
 1  | Alice
 2  | Bob
 3  | Carole
 """
)
t_age = pw.debug.table_from_markdown(
    """
    | age
 1  | 25
 2  | 32
 3  | 28
 """
)
t_david = pw.debug.table_from_markdown(
    """
    | name  | age
 4  | David | 25
 """
)

You can display a snapshot of our table (for debugging purposes) using pw.debug.compute_and_print:

pw.debug.compute_and_print(t_name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name
^YYY4HAB... | Alice
^Z3QWT29... | Bob
^3CZ78B4... | Carole

In the following, let's omit the pw.debug.compute_and_print() for clarity reasons but keep in mind that it is required to print the actual static data at a given time.

Streaming mode

In the streaming mode, those tables would be obtained using one of the connectors provided by Pathway. For example, we could use pw.io.kafka.read to obtain the table from Kafka. The results should be taken out of Pathway using an output connector: we could send the data to postgresql using pw.io.progresql.write for instance. Otherwise, the way the data is manipulated is exactly the same in both modes.

Select and notations

The main way to manipulate a table in Pathway is by using the select operation.

  • The dot notation: you can use select to select a particular column and you can use the dot notation to specify the name of the column. For example, you can access the column "name" of your t_david table:
t_david.select(t_david.name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name
^3HN31E1... | David
  • The bracket notation: you can also use string to access the column the bracket notation. The previous example is equivalent to t_david.select(t_david["name"]).
  • The this notation: to refer to the table currently manipulated you can use pw.this. Our example becomes t_david.select(pw.this.name). This notation works for all standard transformers.

    It can be used to refer to the table, even if it has not been given a name, for example in successive operations:

t_new_age = t_david.select(new_age=pw.this.age).select(
    new_age_plus_7=pw.this.new_age + 7
)
t_new_age
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | new_age_plus_7
^3HN31E1... | 32

In this example, it would be impossible to refer to the table obtained after the first select (with the column new_age) without using pw.this as t_david still refers to the initial and unmodified table.

  • left and right notations: similarly to the this notation, pw.left and pw.right can be used to manipulate the different tables used in a join (.join).

    left_table.join(right_table, pw.left.C1==pw.right.C2).select(pw.left.C3, pw.right.C4)

For more information about the join and the use of pw.left and pw.right, you can see the dedicated section and manual.

  • The star * notation: this notation is used to select all the columns of the manipulated table. table.select(*pw.this) will return the full table. It can be combined with .without to remove the unwanted columns:

    In our example, instead of selecting the "name" column, you could want to select all the columns except the "age" one. This is obtained as follows:

t_david.without("age")
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name
^3HN31E1... | David

⚠️ Note on column names: column names are case sensitive and any string can be used as column name using the bracket notation. However it is not the case for the dot notation which is restricted by standard Python syntax. For instance, the dot notation does not allow names with space: using t.select(t["my name"]) works while t.select(t.my name) will produce an error. By convention, we advocate to follow the Python variable naming convention which is to use only the special character "_" in addition to alphanumeric characters, and not to start with a number.

Manipulating the table

In addition to select, Pathway provides more operators to manipulate and index the tables.

  • Filtering: we can use filter to keep rows following a given property.
t_adult = pw.debug.table_from_markdown(
    """
    | is_adult
 5  | True
 6  | False
 7  | True
 """
)
t_adult.filter(pw.this.is_adult)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | is_adult
^6A0QZMJ... | True
^3S2X6B2... | True

You can use standard python operators, including arithmetic operators such as +, -, *, /, //, <, >, or ~:

t_age.filter(pw.this.age>30)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | age
^Z3QWT29... | 32

You can also use the logical operations AND (&), OR (|) and NOT (~):

t_adult.filter(~pw.this.is_adult)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | is_adult
^A984WV0... | False
  • Reindexing: you can change the ids (accessible by table.id) by using .with_id_from(). You need a table with new ids:
t_new_ids = pw.debug.table_from_markdown(
    """
    | new_id_source
 1  | 4
 2  | 5
 3  | 6
 """
)
t_name.with_id_from(t_new_ids.new_id_source)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name
^3HN31E1... | Alice
^3S2X6B2... | Bob
^A984WV0... | Carole
  • ix_ref: uses a column's values as indexes. As an example, if you have a table containing indexes pointing to another table, you can use this ix_ref to obtain those lines:
t_selected_ids = pw.debug.table_from_markdown(
    """
      | selected_id
 100  | 1
 200  | 3
 """
)
t_selected_ids.select(selected=t_name.ix_ref(t_selected_ids.selected_id).name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | selected
^1BNYXQH... | Alice
^PTQENR4... | Carole
  • Group-by: we can use groupby to aggregate data sharing a common property and then use a reducer to compute an aggregated value.
t_spending = pw.debug.table_from_markdown(
    """
    | name  | amount
 1  | Bob   | 100
 2  | Alice | 50
 3  | Alice | 125
 4  | Bob   | 200
 """
)
t_spending.groupby(pw.this.name).reduce(pw.this.name, sum=pw.reducers.sum(pw.this.amount))
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name  | sum
^GBSDEEW... | Alice | 175
^EDPSSB1... | Bob   | 300

You can do groupbys on multiple columns at once (e.g. .groupby(t.colA, t.colB)). The list of all the available reducers can be found here.

If you want to find out more about the groupby and reduce operations, you can read our article about it.

Working with multiples tables: union, concatenation, join

  • Union: we can use the operator + or += to compute the union of two tables sharing the same ids.
t_union = t_name + t_age
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name   | age
^YYY4HAB... | Alice  | 25
^Z3QWT29... | Bob    | 32
^3CZ78B4... | Carole | 28
  • Concatenation: you can use concat to concatenate two tables, but they need to have disjoint ids.
pw.universes.promise_are_pairwise_disjoint(t_union, t_david)
pw.Table.concat(t_union,t_david)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name   | age
^YYY4HAB... | Alice  | 25
^Z3QWT29... | Bob    | 32
^3CZ78B4... | Carole | 28
^3HN31E1... | David  | 25

As you can see, Pathway keeps the index. If you are concatenating tables with overlapping indexes, you can use concat_reindex which will concat the tables and produce a new index.

Info for Databricks Delta users: Concatenation is highly similar to the SQL MERGE INTO.

  • Join: you can do all usual types of joins in Pathway (inner, outer, left, right). The example below presents an inner join:
t_age.join(t_name, t_age.id==t_name.id).select(t_age.age, t_name.name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | age | name
^04VMVCM... | 25  | Alice
^TT1GT7R... | 28  | Carole
^GFBMT58... | 32  | Bob

Note that in the equality t_age.id==t_name.id the left part must be a column of the table on which the join is done, namely t_name in our example. Doing t_name.id==t_age.id would throw an error.

For more visibility, the pw.left and pw.right notations should be used:

t_age.join(t_name, pw.left.id == pw.right.id).select(pw.left.age, pw.right.name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | age | name
^04VMVCM... | 25  | Alice
^TT1GT7R... | 28  | Carole
^GFBMT58... | 32  | Bob

If you want more info about joins, we have an entire manu[a]l about it!

Updating

  • Adding a new column with a default value with select:
t_age.select(*pw.this, adult=True)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | age | adult
^YYY4HAB... | 25  | True
^3CZ78B4... | 28  | True
^Z3QWT29... | 32  | True

The value can be a basic operation on the columns:

t_age.select(*pw.this, thirties=pw.this.age>=30)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | age | thirties
^YYY4HAB... | 25  | False
^3CZ78B4... | 28  | False
^Z3QWT29... | 32  | True
  • Renaming with select:
t_name.select(surname=pw.this.name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | surname
^YYY4HAB... | Alice
^Z3QWT29... | Bob
^3CZ78B4... | Carole
t_name.rename(surname=pw.this.name)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | surname
^YYY4HAB... | Alice
^Z3QWT29... | Bob
^3CZ78B4... | Carole
  • Updating cells: you can update the values of cells using update_cells which can be also done using the binary operator <<. The ids and column names should be the same.
t_updated_names = pw.debug.table_from_markdown(
    """
    | name
 1  | Alicia
 2  | Bobby
 3  | Caro
 """
)
t_name.update_cells(t_updated_names)
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | name
^YYY4HAB... | Alicia
^Z3QWT29... | Bobby
^3CZ78B4... | Caro

Operations

  • Row-centered operations with pw.apply: you can apply a function to each value of a column (or more) by using pw.apply in a select.
t_age.select(thirties=pw.apply(lambda x: x>30, pw.this.age))
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | thirties
^YYY4HAB... | False
^3CZ78B4... | False
^Z3QWT29... | True

Operations on multiples values of a single row can be easily done this way:

t_multiples_values = pw.debug.table_from_markdown(
    """
    | valA    | valB
 1  | 1       | 10
 2  | 100     | 1000
 """
)
t_multiples_values.select(sum=pw.apply(lambda x,y: x+y, pw.this.valA, pw.this.valB))
[2024-04-19T19:48:50]:INFO:Preparing Pathway computation


            | sum
^YYY4HAB... | 11
^Z3QWT29... | 1100
  • Other operations with transformer classes: Pathway enables complex computation on data streams by using transformer classes. It is a bit advanced for this first-steps guide but you can find all the information about transformer classes in our tutorial.