Controlling temporal behavior of interval join

This article explores concepts related to the temporal behavior of Pathway's interval join, like filtering out late records, freeing memory that is no longer needed, or decreasing the frequency of updates.

Interval join is a temporal join that joins events within a specified time interval. You can read more about it in Performing Interval Joins article.

Let's consider a scenario in which you track customers ordering products. Each order is placed at some specific time (order_time) and reaches the tracking system written in Pathway at time __time__. In a real-world scenario, you don't have perfect control over the time between the moment the order is placed and the moment it reaches the data processing engine. Packets can take different routes over the network, or, in rare cases, a sneaky Georgian woman can cut off internet access for a whole country. As such, times order_time can arrive in the processing system out of order, very late, or even not arrive at all.

To deliver streaming data in a production system, you can use a connector like Kafka or Redpanda (you can read more about them in Kafka connector or Redpanda connector articles). Below, pw.debug.table_from_markdown with __time__ column simulates a connector that delivers data out-of-order to demonstrate configurable temporal behavior of interval join.

import pathway as pw

orders = pw.debug.table_from_markdown(
    """
    customer | product  | order_time | __time__
    Austin   | mouse    |     120    |    122
    Brenda   | keyboard |     120    |    122
    Carl     | mouse    |     124    |    124
    Evelyn   | cable    |     128    |    128
    Frank    | mouse    |     120    |    130
    Min      | mouse    |     124    |    130
    Nicole   | cable    |     130    |    132
    Sam      | keyboard |     128    |    134
    Theresa  | keyboard |     134    |    136
    Yichen   | cable    |     136    |    138
"""
)

discounts = pw.debug.table_from_markdown(
    """
    price | product  | start_time | __time__
      42  | mouse    |    120     |    122
     128  | keyboard |    118     |    122
       8  | cable    |    128     |    134
     135  | keyboard |    132     |    140
      10  | cable    |    122     |    150
"""
)

The store gives discounts on some products and they all last six time units since start_time (incl. start_time). The clients place orders but they get a guarantee that the order is executed only if the product is discounted.

Customers orders with their event and processing times

To get the price paid by a customer, you can perform interval join between orders and product streams. An order, to be associated with the product, has to come no earlier than the product was discounted and no later than the discount ended. So, it has to satisfy start_time order_time start_time+5. This can be expressed as an interval join:

result = discounts.interval_join(
    orders,
    discounts.start_time,
    orders.order_time,
    pw.temporal.interval(0, 5),
    discounts.product == orders.product,
).select(
    orders.customer,
    orders.product,
    discounts.price,
    orders.order_time,
    discounts.start_time,
)
pw.debug.compute_and_print_update_stream(result)
[2024-03-01T18:28:17]:INFO:Preparing Pathway computation


            | customer | product  | price | order_time | start_time | __time__ | __diff__
^MKZX8ZW... | Austin   | mouse    | 42    | 120        | 120        | 122      | 1
^04VMVCM... | Brenda   | keyboard | 128   | 120        | 118        | 122      | 1
^Z3GFZNM... | Carl     | mouse    | 42    | 124        | 120        | 124      | 1
^04JRWV4... | Frank    | mouse    | 42    | 120        | 120        | 130      | 1
^5CVX0JZ... | Min      | mouse    | 42    | 124        | 120        | 130      | 1
^S5QY939... | Evelyn   | cable    | 8     | 128        | 128        | 134      | 1
^M2BCK8N... | Nicole   | cable    | 8     | 130        | 128        | 134      | 1
^33FPTJF... | Theresa  | keyboard | 135   | 134        | 132        | 140      | 1

As time progresses, new orders and products arrive, and you get new information about the prices paid for the products. Note that two customers were not able to purchase a product:

  • Sam wanted to buy a keyboard at time , but it was not discounted then. It was discounted only at times and .
  • Yichen wanted to buy a cable at time , but it also wasn't discounted.

As such, their orders are not present in the result table. If you want to include all orders (also those that can't be executed), you can use interval_join_right.

Forgetting old records

As mentioned before, you don't have any control over the time between the event creation (an example event can be a store that registers that a product X is available at price Y, starting from time Z) and the event arrival at the processing engine. In our scenario, an example of such an event is the cable entry in the products table that was discounted at time and only reached Pathway at time .

In principle, you don't know if more old records won't come in the future. As such, to guarantee that a join on such possibly late data returns correct answers, the processing engine needs to store all the records in its memory.

Practically, keeping all the old records just to handle some very overdue orders may be a price you are not willing to pay, and it's better to ignore such orders while cleaning memory from some old entries.

To make that trade-off possible, Pathway provides the behavior parameter for interval_join, which defines its temporal behavior. Roughly speaking, it allows you to tell Pathway to ignore the records that are too late. Subsequently, that allows you to forget the records that you know won't be joined with any new incoming records in the future. To be more precise: if you set the behavior to e.g. pw.temporal.common_behavior(cutoff=6), Pathway will ignore all records that have times less or equal to maximal already seen time minus . Small remark: the maximal already seen time is held separately for each side of the join, as it allows expressing a join with historical data slightly easier.

result = discounts.interval_join(
    orders,
    discounts.start_time,
    orders.order_time,
    pw.temporal.interval(0, 5),
    discounts.product == orders.product,
    behavior=pw.temporal.common_behavior(cutoff=6),
).select(
    orders.customer,
    orders.product,
    discounts.price,
    orders.order_time,
    discounts.start_time,
)
pw.debug.compute_and_print_update_stream(result)
[2024-03-01T18:28:17]:INFO:Preparing Pathway computation


            | customer | product  | price | order_time | start_time | __time__ | __diff__
^MKZX8ZW... | Austin   | mouse    | 42    | 120        | 120        | 122      | 1
^04VMVCM... | Brenda   | keyboard | 128   | 120        | 118        | 122      | 1
^Z3GFZNM... | Carl     | mouse    | 42    | 124        | 120        | 124      | 1
^5CVX0JZ... | Min      | mouse    | 42    | 124        | 120        | 130      | 1
^S5QY939... | Evelyn   | cable    | 8     | 128        | 128        | 134      | 1
^M2BCK8N... | Nicole   | cable    | 8     | 130        | 128        | 134      | 1
^33FPTJF... | Theresa  | keyboard | 135   | 134        | 132        | 140      | 1

Let's see what happens in this case. The final result doesn't contain Frank's order. His order was performed at time and could be joined with the mouse being discounted at time . However, the maximal seen time in the orders stream when Frank's order arrived was (order_time of Evelyn's order). All new records with order_time less or equal to had to be ignored. Note that Min's order came to Pathway at the same time, but its order_time was , so it was not ignored.

The cutoff threshold doesn't have to be that tight. Setting a higher cutoff will allow you to process more late records, but the memory footprint of an interval join might increase then. It'll just store all records that are above the maximal seen time minus cutoff.

Keeping only the most up-to-date data

Imagine you want to create a dashboard with the most recent orders. You don't want to display (or even store) old orders. Don't worry! Pathway can solve this problem too. It is enough to set the keep_results parameter of common_behavior to False. Then, all records with event time no larger than the maximal seen time minus cutoff will be removed from the output. Let's have a look at how it works:

result = discounts.interval_join(
    orders,
    discounts.start_time,
    orders.order_time,
    pw.temporal.interval(0, 5),
    discounts.product == orders.product,
    behavior=pw.temporal.common_behavior(cutoff=8, keep_results=False),
).select(
    orders.customer,
    orders.product,
    discounts.price,
    orders.order_time,
    discounts.start_time,
)
pw.debug.compute_and_print_update_stream(result)
[2024-03-01T18:28:18]:INFO:Preparing Pathway computation


            | customer | product  | price | order_time | start_time | __time__ | __diff__
^MKZX8ZW... | Austin   | mouse    | 42    | 120        | 120        | 122      | 1
^04VMVCM... | Brenda   | keyboard | 128   | 120        | 118        | 122      | 1
^Z3GFZNM... | Carl     | mouse    | 42    | 124        | 120        | 124      | 1
^MKZX8ZW... | Austin   | mouse    | 42    | 120        | 120        | 130      | -1
^04VMVCM... | Brenda   | keyboard | 128   | 120        | 118        | 130      | -1
^5CVX0JZ... | Min      | mouse    | 42    | 124        | 120        | 130      | 1
^S5QY939... | Evelyn   | cable    | 8     | 128        | 128        | 134      | 1
^M2BCK8N... | Nicole   | cable    | 8     | 130        | 128        | 134      | 1
^Z3GFZNM... | Carl     | mouse    | 42    | 124        | 120        | 138      | -1
^5CVX0JZ... | Min      | mouse    | 42    | 124        | 120        | 138      | -1
^33FPTJF... | Theresa  | keyboard | 135   | 134        | 132        | 140      | 1

In the end, the maximal seen time in the orders stream is . That's why all records with a time less or equal to are forgotten.

Stabilizing the stream

Another feature of temporal behavior is the ability to delay the results production. It can be useful if the input stream is unstable, with several possible updates to the input records, and the output that should not contain every intermediate result. Let's return to our shop scenario and consider a situation in which product prices are updated. For instance, it can be caused by a store employee entering an incorrect price first and later fixing it.

In this example, a special column __diff__ is used to tell Pathway whether a record should be added () or removed (). Remember that pw.debug.table_from_markdown is used to simulate a streaming behavior. In a real system, the way of deleting entries depends on an input connector you plan to use.

discounts_with_updates = pw.debug.table_from_markdown(
    """
    id | price | product  | start_time | __time__ | __diff__
     1 |   42  | mouse    |     120    |    122   |     1
     2 |  128  | keyboard |     118    |    122   |     1
     1 |   42  | mouse    |     120    |    124   |    -1
     1 |   43  | mouse    |     120    |    124   |     1
     3 |    8  | cable    |     128    |    134   |     1
     3 |    8  | cable    |     128    |    138   |    -1
     3 |   10  | cable    |     128    |    138   |     1
     4 |  135  | keyboard |     132    |    140   |     1
     5 |   10  | cable    |     122    |    150   |     1
"""
)

result = discounts_with_updates.interval_join(
    orders,
    discounts_with_updates.start_time,
    orders.order_time,
    pw.temporal.interval(0, 5),
    discounts_with_updates.product == orders.product,
).select(
    orders.customer,
    orders.product,
    discounts_with_updates.price,
    orders.order_time,
    discounts_with_updates.start_time,
)
pw.debug.compute_and_print_update_stream(result)
[2024-03-01T18:28:18]:INFO:Preparing Pathway computation


            | customer | product  | price | order_time | start_time | __time__ | __diff__
^G5A47FZ... | Austin   | mouse    | 42    | 120        | 120        | 122      | 1
^CRYASKY... | Brenda   | keyboard | 128   | 120        | 118        | 122      | 1
^G5A47FZ... | Austin   | mouse    | 42    | 120        | 120        | 124      | -1
^G5A47FZ... | Austin   | mouse    | 43    | 120        | 120        | 124      | 1
^PSJ822X... | Carl     | mouse    | 43    | 124        | 120        | 124      | 1
^3MFB1C1... | Frank    | mouse    | 43    | 120        | 120        | 130      | 1
^51VRDBC... | Min      | mouse    | 43    | 124        | 120        | 130      | 1
^TT1GT7R... | Evelyn   | cable    | 8     | 128        | 128        | 134      | 1
^8V184A9... | Nicole   | cable    | 8     | 130        | 128        | 134      | 1
^TT1GT7R... | Evelyn   | cable    | 8     | 128        | 128        | 138      | -1
^8V184A9... | Nicole   | cable    | 8     | 130        | 128        | 138      | -1
^TT1GT7R... | Evelyn   | cable    | 10    | 128        | 128        | 138      | 1
^8V184A9... | Nicole   | cable    | 10    | 130        | 128        | 138      | 1
^65SNT84... | Theresa  | keyboard | 135   | 134        | 132        | 140      | 1

The above script is run without any delays. Because of that, in the output the price Austin paid for a mouse is updated from at time to at time . A similar situation happens to Evelyn - one price is present at times , and a new price is present from time . If you are willing to wait until the results stabilize, you can use the delay parameter of temporal behavior to buffer the results before producing them. Let's set it to and see what happens.

result = discounts_with_updates.interval_join(
    orders,
    discounts_with_updates.start_time,
    orders.order_time,
    pw.temporal.interval(0, 5),
    discounts_with_updates.product == orders.product,
    behavior=pw.temporal.common_behavior(delay=4),
).select(
    orders.customer,
    orders.product,
    discounts_with_updates.price,
    orders.order_time,
    discounts_with_updates.start_time,
)
pw.debug.compute_and_print_update_stream(result)
[2024-03-01T18:28:18]:INFO:Preparing Pathway computation


            | customer | product  | price | order_time | start_time | __time__             | __diff__
^G5A47FZ... | Austin   | mouse    | 43    | 120        | 120        | 138                  | 1
^CRYASKY... | Brenda   | keyboard | 128   | 120        | 118        | 138                  | 1
^PSJ822X... | Carl     | mouse    | 43    | 124        | 120        | 138                  | 1
^3MFB1C1... | Frank    | mouse    | 43    | 120        | 120        | 138                  | 1
^51VRDBC... | Min      | mouse    | 43    | 124        | 120        | 138                  | 1
^TT1GT7R... | Evelyn   | cable    | 10    | 128        | 128        | 150                  | 1
^8V184A9... | Nicole   | cable    | 10    | 130        | 128        | 150                  | 1
^65SNT84... | Theresa  | keyboard | 135   | 134        | 132        | 18446744073709551614 | 1

Now, records wait for maximal seen time to become at least record_time+4 before being joined. (By record_time+4 I mean order_time+4 for orders stream, start_time+4 for products stream). Thanks to the delay, the stream can stabilize, and there are no price fluctuations in the output. Of course, you should choose a proper value of delay parameter depending on the times in your application.

You could also combine delay and cutoff to stabilize the stream and forget old records.

Thank you for following this tutorial. I hope you now have a better understanding of the ways to control the temporal behavior of the interval join.

interval joinbehaviorlate datadelaycutoffout-of-order data