Subscribing to changes with Python function

In this article, you will learn how to subscribe to changes in a table using a plain Python function.

Let's say you would like to perform some action on every change in the table. It could be a simple print to the console or a custom API call. After this tutorial, you will know how to use the pw.io.subscribe function.

Assume you have a streaming input table obtained with pw.demo.range_stream:

import pathway as pw

table = pw.demo.range_stream()

Now you just need to define a Python callback with your custom logic.

import logging

def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool):
  logging.info(f"{key}, {row}, {time}, {is_addition}")

Four parameters of the callback are expected to be (with names respectively):

  • key - id of a row,
  • row- a dictionary from the column name to the column value,
  • time - time of a change,
  • is_addition - whether the change is addition. If False, it means data was removed from the table.

All that's left is to call pw.io.subscribe. Do not forget to run the graph.

pw.io.subscribe(table, on_change)

pw.run()

Function on_change will be called on every change in the input stream.

As an option, you can specify one more function that will be called with no arguments when the stream of changes ends. It will be called for each engine worker separately.

def on_end():
  logging.info("End of stream.")

pw.io.subscribe(table, on_change, on_end)
connectorPythonoutputstreaming