Streams of updates

How do Pathway connectors handle streams of updates?

Pathway works with dynamic data tables. This means that new data entries can arrive through input connectors in a streaming manner. As a result, input data tables are updated, and existing entries may be changed or deleted. These changes propagate to output tables. Having a stream of updates is the way to go!

However, even a single input change can result in changes to multiple output rows. Moreover, some input changes only make sense when treated together. To allow changes to be read from input connectors, and presented to output connectors in a compact and efficient way, we group them into stream transactions.

Format of a single stream element

Each element of a change stream, either at input or at output, consists of the following items representing either insertions or deletions:

  1. The data row, which has undergone an insertion or deletion;
  2. The type of operation, represented as a number which can be either 1 if the row was inserted or -1 if the row was deleted.
  3. The number (identifier) of the stream transaction in which the stream element has occurred.

Note that row update operations are represented as two elements: one for deletion of the old row value, and one for insertion of the new one. Typically, these two elements will appear as part of a single stream transaction.

Transactional changes

In Pathway, input connector stream elements may be grouped into transactions (atomic changes). These serve the purpose of both ensuring intermediate consistency, output consistency, and computational efficiency. In general, such grouping into transactions is controlled by the designer of the code logic in Pathway. The number of the corresponding stream transaction is reported as part of each stream element it belongs to.

The number of the stream transaction may be treated only as an internal identifier within Pathway, or it may serve as a part of the logic used by applications downstream of Pathway's output connectors. Indeed, output tables of Pathway can subsequently be streamed into message queues, where the order of elements cannot always be guaranteed.

The numbering of stream transactions helps to resolve this problem: the Pathway framework guarantees that there will be no more than one insertion or deletion entry within one stream transaction which will concern any given row of an output table.

In some cases, for purposes of efficiency, changes at input are often grouped by upstream systems into mini-batches to reduce the number of computations involved. To guarantee that all changes from a mini-batch are treated by Pathway together, they should be logically placed within the same transaction. This also means that "intermediate" states of computation may be lost: Pathway is only expected and required to compute the output state of the system at the end of each transaction, and to send the changes between successive stream transactions to output connectors.

Transactions also have a connection with the failure recovery for the case when Pathway persists data. That is, for a particular time, several lines have to be outputted. If not all of the lines for a particular time have been sent to the sink, the results for this time may appear when the program restarts. Thus, giving us an "at least once" persistence scenario. Please note, however, that in case of successful termination, the data will be committed in full, hence, in case of restart after a successful termination, exactly once delivery will be ensured.

Outputting live snapshots

A live snapshot is a compacted representation of the way a Pathway data table changes over time, which can be provided by Pathway connectors at output. Just like a stream of updates, elements of a live snapshot consist of:

  1. The data row;
  2. The type of operation;
  3. The number (identifier) of the stream transaction in which the stream element has occurred (a column named time).

and on top of this, one additional column field:

  1. The identifier of the last element within the given stream transaction which affects the given row (a column named diff).

Live snapshots organize data sent to downstream systems by replacing data entries according to a primary key, specified to Pathway's output connector. By contrast, regular streams of updates simply append the change information to the output data sink.

Live snapshots can only be used in conjunction with certain downstream systems to which Pathway can send update streams. The main purpose of live snapshots is to be used in conjunction with database storage which supports data updates in an inexpensive way.

Notably, we support live snapshots with the PostgreSQL connector. By contrast, live snapshots are not feasible when working with CSV files: compacting a stream over a CSV file would require Pathway to reopen and read/rewrite parts of the file, which would be lengthy and resource-consuming.

Why do we need the auxiliary fields in a live snapshot?

As described above, a live snapshot entry consists of the row itself plus two extra fields which represent the transaction, in which the row has changed, as well as the identifier of the last operation in the transaction which touches this row. The last operation can either be an addition - in case the row was added to the table or has changed - or a deletion. This additional information is kept in order to avoid the corruption of the live snapshot data downstream in case of distributed computation and the usage of message queues, which would risk not preserving the order of events passed into the stream.

In particular, if such a reordering of events in the output of a stream transaction were to happen, we could have seen an insertion event coming from a later transaction first and the deletion of an earlier transaction afterwards. By processing all events sequentially without storing additional information on their origin could lead the downstream storage to perform an inappropriate deletion event, which would override the actual insertion (or update) event for a row. Maintaining this extra information allows us to ensure that the latest piece of information will not be overwritten by an earlier one, thus providing a consistent state of the live snapshot.

Sergey Kulik

Lead Software Research Engineer and Solutions Architect