Run In Colab  View in Github

Pathway: a survival guide

Must-read for both first-timers and veterans alike, this guide gathers the most commonly used basic elements of Pathway.

While the Pathway programming framework comes with advanced functionalities such as classifiers or fuzzy-joins, it is essential to master the basic operations at the core of the framework. As part of this survival guide, we are going to walk through the following topics:

If you want more information you can see our complete API docs or some of our tutorials.

Streaming and static modes

The first thing to keep in mind is that Pathway is made for streaming data. In this streaming mode, Pathway assumes unbounded incoming updates. To process the incoming data, Pathway maintains a computation graph. This mode requires input connectors listening to streaming data sources. The computation runs indefinitely until the process is killed: the computation starts with pw.run() and everything afterwards is unreachable code.

However, the streaming mode may not be the most convenient when testing or debugging. For that purpose, Pathway provides a static mode in which static data may be attached to the connectors. In that mode, finite and static data can be loaded, e.g. from a table written in a static csv file or from a markdown table.

When the computation is run in the static mode, all the data is loaded and processed at once. While the static mode does not fully benefit from the computation graph, it allows to check if the graph is correctly built. To ease the debugging, Pathway provides a function called compute_and_print. When calling pw.debug.compute_and_print(t), Pathway builds the whole graph, ingests all the available static data, prints the obtained table t, and then discards the data. Calling twice compute_and_print will result in ingesting the whole data twice. In the streaming mode, the building of the graph and the ingestion of the data is done only once when pw.run() is called and the data is maintained in the graph until the computation is over: results cannot be printed but should be accessed using output connectors.

The processing pipeline should be designed in the same way no matter mode is used in the end. The only difference is how the data is considered.

To learn more about both modes, you can read our article about it.

In most our examples, we use the static mode as it avoids setting external streaming data sources such as Kafka.

Starting with data

Be sure to always import Pathway.

import pathway as pw

Now we need tables to manipulate. The way tables are obtained depends on whether we are on streaming or static mode. In this article, we will be using the static mode to be able to show the behavior of each processing step, but keep in mind that Pathway is made for the streaming mode.

Static mode

In the static mode, we can manually enter table using a markdown connector. Here are the (static) tables we will be using:

t_name = pw.debug.table_from_markdown(    """    | name 1  | Alice 2  | Bob 3  | Carole """)t_age = pw.debug.table_from_markdown(    """    | age 1  | 25 2  | 32 3  | 28 """)t_david = pw.debug.table_from_markdown(    """    | name  | age 4  | David | 25 """)

We can display a snapshot of our table (for debugging purposes) using pw.debug.compute_and_print():

pw.debug.compute_and_print(t_name)
            | name
^2TMTFGY... | Alice
^YHZBTNY... | Bob
^SERVYWW... | Carole

In the following we will omit the pw.debug.compute_and_print() this for clarity reasons but keep in mind that it is required to print the actual static data at a given time.

Streaming mode

In the streaming mode, those tables would be obtained using one of the connectors provided by Pathway. For example, we could use t_name=pw.io.kafka(settings,id_columns=['name']) to obtain the table from Kafka. The results should be taken out of Pathway using an output connector: we could send the data to postgresql using pw.io.progresql.write() for instance. Otherwise, the way the data is manipulated is exactly the same in both modes.

Select and notations

The main way to manipulate a table in Pathway is by using the select operation.

  • The dot notation: we can use select to select a particular column and we can use the dot notation to specify the name of the column. For example, we can access the column "name" of our t_extra table:
t_david.select(t_david.name)
            | name
^8GR6BSX... | David
  • The bracket notation: we can also use string to access the column the bracket notation. The previous example is equivalent to t_david.select(t_david["name"]).
  • The this notation: to refer to the table currently manipulated we can use pw.this. Our example becomes t_david.select(pw.this.name). This notation works for all standard transformers.

    It can be use to refer to the table, even if it has not been given a name, for example in a successive operations:

t_new_age = t_david.select(new_age=pw.this.age).select(    new_age_plus_7=pw.this.new_age + 7)pw.debug.compute_and_print(t_new_age)
            | new_age_plus_7
^8GR6BSX... | 32

In this example, it would be impossible to refer to the table obtained after the first select (with the column new_age) without using pw.this as t_david still refers to the initial and unmodified table.

  • left and right notations: similarly to the this notation, pw.left and pw.right can be used to manipulate the different tables used in a join.

    left_table.join(right_table, pw.left.C1==pw.right.C2).select(pw.left.C3, pw.right.C4)

For more information about the join and the use of pw.left and pw.right, you can see the dedicated section and manual.

  • The star * notation: this notation is used to select all the columns of the manipulated table. table.select(*pw.this) will return the full table. It can be combined with .without to remove the unwanted columns:

    In our example, instead of selecting the "name" column, we want to select all the columns except the "age" one. This is obtained as follows:

t_david.select(*pw.this.without(pw.this.age))
            | name
^8GR6BSX... | David

⚠️ Note on column names: column names are case sensitive and any string can be used as column name using the bracket notation. However it is not the case for the the dot notation which is restricted by standard Python syntax. For instance, the dot notation does not allow name with space: using t.select(t["my name"]) works while t.select(t.my name) will produce an error. By convention, we advocate to follow the Python variable naming convention which is to use only the special character "_" in addition to alphanumeric characters, and not to start with a number.

Manipulating the table

In addition to select, Pathway provides more operators to manipulate and index the tables.

  • Filtering: we can use filter to keep rows following a given property.
t_adult = pw.debug.table_from_markdown(    """    | is_adult 5  | True 6  | False 7  | True """)t_adult.filter(pw.this.is_adult)
            | is_adult
^19D0FQ9... | True
^76QPWK3... | True

You can use standard python operators, including arithmetic operators such as +, -, *, /, //, <, >, or ~:

t_age.filter(pw.this.age>30)
            | age
^YHZBTNY... | 32

The logical negation can be used using ~:

t_adult.filter(~pw.this.is_adult)
            | is_adult
^C4S6S48... | False
  • Reindexing: you can change the ids (accessible by table.id) by using .with_id_from(). We need a table with new ids:
t_new_ids = pw.debug.table_from_markdown(    """    | new_id_source 1  | 4 2  | 5 3  | 6 """)
t_name.unsafe_promise_universe_is_subset_of(t_new_ids).with_id_from(t_new_ids.new_id_source)
            | name
^8GR6BSX... | Alice
^76QPWK3... | Bob
^C4S6S48... | Carole

Here we need to use unsafe_promise_universe_is_subset_of, you can find the explanation in our article about Pathway's concepts.

  • ix: uses a column's values as indexes. As an example, if we have a table containing with indexes pointing to another table, we can use this ix to obtain those lines:
t_selected_ids = pw.debug.table_from_markdown(    """      | selected_id 100  | 1 200  | 3 """)t_selected_ids.select(selected=t_name.ix_ref(t_selected_ids.selected_id).name)
            | selected
^M1T2QKJ... | Alice
^9WGHV46... | Carole
  • Group-by: we can use groupby to aggregate data sharing a common property and then use a reducer to compute an aggregated value.
t_spending = pw.debug.table_from_markdown(    """    | name  | amount 1  | Bob   | 100 2  | Alice | 50 3  | Alice | 125 4  | Bob   | 200 """)t_spending.groupby(pw.this.name).reduce(pw.this.name, sum=pw.reducers.sum(pw.this.amount))
            | name  | sum
^TSP7EFT... | Alice | 175
^4PVZ777... | Bob   | 300

You can do groupbys on multiples columns at once (e.g. .groupby(t.colA, t.colB)). The list of all the available reducers can be found here.

If you want to find out more about the groupby and reduce operations, you can read our article about it.

Working with multiples tables: union, concatenation, join

  • Union: we can use the operator + or += to add compute the union of two tables sharing the same ids.
t_age = t_age.unsafe_promise_same_universe_as(t_name)t_union = t_name + t_age
            | name   | age
^2TMTFGY... | Alice  | 25
^YHZBTNY... | Bob    | 32
^SERVYWW... | Carole | 28
  • Concatenation: we can use Table.concat(t1,t2) to concatenate two tables, but they need to have the same ids.
pw.Table.concat(t_union,t_david)
            | name   | age
^531BJZ8... | Alice  | 25
^9SVRC47... | Bob    | 32
^R5XMQ21... | Carole | 28
^C4VQQCA... | David  | 25

As you can see, Pathway may reindex the obtained tables.

Info for Databricks Delta users: Concatenation is highly similar to the SQL MERGE INTO operation.

  • Join: we can do all usual types of joins in Pathway (inner, outer, left, right). The example below presents an inner join:
t_age.join(t_name, t_age.id==t_name.id).select(t_age.age, t_name.name)
            | age | name
^VJ3K9DF... | 25  | Alice
^R0GE4WM... | 28  | Carole
^V1RPZW8... | 32  | Bob

Note that in the equality t_age.id==t_name.id the left part must be a column of the table on which the join is done, namely t_name in our example. Doing t_name.id==t_age.id would throw an error.

For more visibility, the pw.left and pw.right notations should be used:

t_age.join(t_name, pw.left.id == pw.right.id).select(pw.left.age, pw.right.name)
            | age | name
^VJ3K9DF... | 25  | Alice
^R0GE4WM... | 28  | Carole
^V1RPZW8... | 32  | Bob

If you want more info about joins, we have an entire manu[a]l about it!

Updating

  • Adding a new column with a default value with select:
t_age.select(*pw.this, adult=True)
            | age | adult
^2TMTFGY... | 25  | True
^SERVYWW... | 28  | True
^YHZBTNY... | 32  | True

The value can be a basic operation on the columns:

t_age.select(*pw.this, thirties=pw.this.age>=30)
            | age | thirties
^2TMTFGY... | 25  | False
^SERVYWW... | 28  | False
^YHZBTNY... | 32  | True
  • Renaming with select:
t_name.select(surname=pw.this.name)
            | surname
^2TMTFGY... | Alice
^YHZBTNY... | Bob
^SERVYWW... | Carole
  • Renaming with rename_columns:
t_name.rename_columns(surname=pw.this.name)
            | surname
^2TMTFGY... | Alice
^YHZBTNY... | Bob
^SERVYWW... | Carole
  • Updating cells: you can the values of cells using update_cells which can be also done using the binary operator <<. The ids and column names should be the same.
t_updated_names = pw.debug.table_from_markdown(    """    | name 1  | Alicia 2  | Bobby 3  | Caro """)t_updated_names = t_updated_names.unsafe_promise_same_universe_as(t_name)t_name.update_cells(t_updated_names)
            | name
^2TMTFGY... | Alicia
^YHZBTNY... | Bobby
^SERVYWW... | Caro

Operations

  • Row-centered operations with apply: you can apply a function to each value of a column (or more) by using apply in a select.
t_age.select(thirties=pw.apply(lambda x: x>30, pw.this.age)))
            | thirties
^2TMTFGY... | False
^SERVYWW... | False
^YHZBTNY... | True

Operations on multiples values of a single row can be easily done this way:

t_multiples_values = pw.debug.table_from_markdown(    """    | valA    | valB 1  | 1       | 10 2  | 100     | 1000 """)t_multiples_values.select(sum=pw.apply(lambda x,y: x+y, pw.this.valA, pw.this.valB))
            | sum
^2TMTFGY... | 11
^YHZBTNY... | 1100
  • Other operations with transformer classes: Pathway enables complex computation on data stream by using transformer classes. It is a bit advanced for this survival guide but you can find all the information about transformer classes in our tutorial.

Olivier Ruas

Algorithm and Data Processing Magician