pw.io

In Pathway, accessing the data is done using connectors. This page provides their API documentation. See connector articles for an overview of their architecture.

class pw.io.CsvParserSettings(delimiter=',', quote='"', escape=None, enable_double_quote_escapes=True, enable_quoting=True, comment_character=None)

[source]
Class representing settings for the CSV parser.
  • Parameters
    • delimiter – Field delimiter to use when parsing CSV.
    • quote – Quote character to use when parsing CSV.
    • escape – What character to use for escaping fields in CSV.
    • enable_double_quote_escapes – Enable escapes of double quotes.
    • enable_quoting – Enable quoting for the fields.
    • comment_character – If specified, the lines starting with the comment character will be treated as comments and therefore, will be ignored by parser

class pw.io.OnChangeCallback(*args, **kwargs)

[source]
The callback to be called on every change in the table. It is required to be callable and to accept four parameters: the key, the row changed, the time of the change in milliseconds and the flag stating if the change had been an addition of the row.

class pw.io.OnFinishCallback(*args, **kwargs)

[source]
The callback function to be called when the stream of changes ends. It will be called on each engine worker separately.

pw.io.subscribe(table, on_change, on_end=<function <lambda>>, on_time_end=<function <lambda>>)

sourceCalls a callback function on_change on every change happening in table.

  • Parameters
    • table – the table to subscribe.
    • on_change (OnChangeCallback) – the callback to be called on every change in the table. The function is required to accept three parameters: the row changed, the time of the change in microseconds and the flag stating if the change had been an addition of the row. These parameters of the callback are expected to have names row, time and is_addition respectively.
    • on_end (OnFinishCallback) – the callback to be called when the stream of changes ends.
    • on_time_end (OnTimeEndCallback) – the callback function to be called on each closed time of computation.
  • Returns
    None

Example:

import pathway as pw

table = pw.debug.table_from_markdown('''
     | pet  | owner   | age | __time__ | __diff__
   1 | dog  | Alice   | 10  | 0        | 1
   2 | cat  | Alice   | 8   | 2        | 1
   3 | dog  | Bob     | 7   | 4        | 1
   2 | cat  | Alice   | 8   | 6        | -1
''')

def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool):
    print(f"{row}, {time}, {is_addition}")

def on_end():
    print("End of stream.")

pw.io.subscribe(table, on_change, on_end)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
{'pet': 'dog', 'owner': 'Alice', 'age': 10}, 0, True
{'pet': 'cat', 'owner': 'Alice', 'age': 8}, 2, True
{'pet': 'dog', 'owner': 'Bob', 'age': 7}, 4, True
{'pet': 'cat', 'owner': 'Alice', 'age': 8}, 6, False
End of stream.