pw.io

In Pathway, accessing the data is done using connectors. This page provides their API documentation. See connector articles for an overview of their architecture.

class CsvParserSettings(delimiter=',', quote='"', escape=None, enable_double_quote_escapes=True, enable_quoting=True, comment_character=None)

[source]

Class representing settings for the CSV parser.

  • Parameters
    • delimiter – Field delimiter to use when parsing CSV.
    • quote – Quote character to use when parsing CSV.
    • escape – What character to use for escaping fields in CSV.
    • enable_double_quote_escapes – Enable escapes of double quotes.
    • enable_quoting – Enable quoting for the fields.
    • comment_character – If specified, the lines starting with the comment character will be treated as comments and therefore, will be ignored by parser

class OnChangeCallback(*args, **kwargs)

[source]

The callback to be called on every change in the table. It is required to be callable and to accept four parameters: the key, the row changed, the time of the change in milliseconds and the flag stating if the change had been an addition of the row.

class OnFinishCallback(*args, **kwargs)

[source]

The callback function to be called when the stream of changes ends. It will be called on each engine worker separately.

register_input_synchronization_group(*columns, max_difference, name='default')

sourceCreates a synchronization group for a specified set of columns. The set must consist of at least two columns, each belonging to a different table. These tables must be read using one of the input connectors (they have to be input tables). Transformed tables cannot be used.

The synchronization group ensures that the engine reads data into the specified tables in such a way that the difference between the maximum read values from each column does not exceed max_difference.

All columns must have the same data type to allow for proper comparison, and max_difference must be the result of subtracting values from two columns.

The logic of synchronization group is the following:

  • If a data source lags behind, the engine will read more data from it to align

its values with the others and will continue reading from the other sources only after the lagging one has caught up.

  • If a data source is too fast compared to others, the engine will delay its reading until the slower sources (i.e., those with lower values in their specified columns) catch up.

Limitations:

  • This mechanism currently works only in runs that use a single Pathway process. The

multi-processing support will be added soon.

  • Currently, only int fields are supported. Support for DateTimeNaive and DateTimeUtc will be added soon.
  • Parameters
    • columns (ColumnReference) – A list of columns that will be monitored and synchronized. Each column must belong to a different table read from an input connector.
    • max_difference (Union[None, int, float, str, bytes, bool, Pointer, datetime, timedelta, ndarray, Json, dict[str, Any], tuple[Any, ...], Error, Pending]) – The maximum allowed difference between the highest values in the tracked columns at any given time. Must be derived from subtracting values of two columns specified before.
    • name (str) – The name of the synchronization group, used for logging and debugging purposes.
  • Returns
    None

Example:

Suppose you have two data sources:

  • login_events, a table read from the Kafka topic "logins".
  • transactions, a table read from the Kafka topic "transactions".

Each table contains a timestamp field that represents the number of seconds since the UNIX Epoch. You want to ensure that these tables are read simultaneously, with no more than a 10-minute (600-second) difference between their maximum timestamp values.

First, you need define the table schema:

import pathway as pw
class InputSchema(pw.Schema):
    event_id: str
    unix_timestamp: int
    data: pw.Json
    # Other relevant fields can be added here

Next, you read both tables from Kafka. Assuming the Kafka server runs on host "kafka" and port 8082:

login_events = pw.io.kafka.simple_read("kafka:8082", "logins", format="json", schema=InputSchema)
transactions = pw.io.kafka.simple_read("kafka:8082", "transactions", format="json", schema=InputSchema)

Finally, you can synchronize these two tables by creating a synchronization group:

pw.io.register_input_synchronization_group(
    login_events.unix_timestamp,
    transactions.unix_timestamp,
    max_difference=600,
)

This ensures that both topics are read in such a way that the difference between the maximum timestamp values at any moment does not exceed 600 seconds (10 minutes).

Note:

If all data sources have a gap larger than max_difference, the synchronization group will wait until data from all sources arrives. Once all sources move past the gap, the synchronization group will allow reading to proceed further.

Example scenario: Consider a synchronization group with two data sources, both tracking a timestamp column, and max_difference set to 600 seconds (10 minutes).

  • Initially, both sources send a record with timestamp T.
  • Later, the first source sends a record with T + 1h.
      This record is not yet forwarded for processing because it exceeds `max_difference`.
    
  • If the second source then sends a record with T + 1h, the system detects a 1-hour gap.
      Since both sources have moved beyond `T`, the synchronization group accepts `T + 1h`
      as the new baseline and continues processing from there.
    
  • However, if the second source instead sends a record with T + 5m, this record
      is processed normally. The system will continue waiting for the first source to
      catch up before advancing further.
    

This behavior ensures that data gaps do not cause deadlocks but are properly detected and handled.

subscribe(table, on_change, on_end=lambda : ..., on_time_end=lambda time: ..., *, name=None, sort_by=None)

sourceCalls a callback function on_change on every change happening in table.

  • Parameters
    • table – the table to subscribe.
    • on_change (OnChangeCallback) – the callback to be called on every change in the table. The function is required to accept four parameters: the key, the row changed, the time of the change in microseconds and the flag stating if the change had been an addition of the row. These parameters of the callback are expected to have names key, row, time and is_addition respectively.
    • on_end (OnFinishCallback) – the callback to be called when the stream of changes ends.
    • on_time_end (OnTimeEndCallback) – the callback function to be called on each closed time of computation.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

import pathway as pw

table = pw.debug.table_from_markdown('''
     | pet  | owner   | age | __time__ | __diff__
   1 | dog  | Alice   | 10  | 0        | 1
   2 | cat  | Alice   | 8   | 2        | 1
   3 | dog  | Bob     | 7   | 4        | 1
   2 | cat  | Alice   | 8   | 6        | -1
''')

def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool):
    print(f"{row}, {time}, {is_addition}")

def on_end():
    print("End of stream.")

pw.io.subscribe(table, on_change, on_end)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)