Out-of-Order Event Streams: Calculating Time Deltas with grouping by topic
Przemek UznańskiWe are processing a stream of events on different topics. We want to compute, for each event, how much time has passed since the previous event on the same topic. The issue is that the events do not appear in the order of their timestamps at input! With Pathway there is no need to worry about that!
First we use Debezium to synchronize the input data from a database table with the following columns:
- timestamp - logical date-time when the event happened
- topic_id - topic in question
- message - message content.
import pathway as pw
events = pw.io.debezium.read(
rdkafka_settings={
"group.id": "$GROUP_NAME",
"bootstrap.servers": "clean-panther-8776-eu1-kafka.upstash.io:9092",
"session.timeout.ms": "6000",
},
topics=["important_events"],
)
Then we sort the events from the table. Pathway provides a sort function to sort a table according to its key column: in this case we are going to sort according to the timestamps of the events. In addition, each topic is mapped to an instance field, which allows us to work on different streams simultaneously.
The prev and next pointers are automatically extracted.
sorted_events = events.sort(key=events.timestamp, instance=events.topic_id)
Finally, we process events in order of their timestamps at input.
events_with_prev = events.with_columns(
prev_timestamp=events.ix(sorted_events.prev, optional=True).timestamp
).filter(pw.this.prev_timestamp.is_not_none())
differences = events_with_prev.select(delta=pw.this.timestamp - pw.this.prev_timestamp)
pw.io.postgres.write(
differences,
postgres_settings={
"host": "localhost",
"port": "5432",
"dbname": "transactions",
"user": "pathway",
"password": "my_password",
},
table_name="events_processed",
)
Olivier Ruas
tutorial · machine-learningOct 26, 2022Realtime Classification with Nearest Neighbors
Saksham Goel
blog · tutorial · engineeringFeb 5, 2025Real-Time AI Pipeline with DeepSeek, Ollama and Pathway
Pathway Team
tutorial · engineering · case-studyAug 27, 2024Achieve Sub-Second Latency with your S3 Storage without Kafka

