We are processing a stream of events on different topics. We want to compute, for each event, how much time has passed since the previous event on the same topic. The issue is that the events do not appear in the order of their timestamps at input! With Pathway there is no need to worry about that!
First we use Debezium to synchronize the input data from a database table with the following columns:
import pathway as pw
events = pw.io.debezium.read(
rdkafka_settings={
"group.id": "$GROUP_NAME",
"bootstrap.servers": "clean-panther-8776-eu1-kafka.upstash.io:9092",
"session.timeout.ms": "6000",
},
topics=["important_events"],
)
Then we build a sorted index of the events from the table. Pathway provides a build_sorted_index
to sort a table according to its key
column: in this case we are going to sort according to the timestamps of the events. In addition, each topic is mapped to an instance
field, which allows us to work on different streams simultaneously.
We immediately use the index to sort the table (that is, extract prev
and next
pointers).
sorted_events = pw.indexing.sort_from_index(
**pw.indexing.build_sorted_index(
events + events.select(key=events.timestamp, instance=events.topic_id)
)
)
Finally, we process events in order of their timestamps at input.
differences = events.having(sorted_events.prev).select(
delta=events.timestamp - pw.this.timestamp
)
pw.io.postgres.write(
differences,
postgres_settings={
"host": "localhost",
"port": "5432",
"dbname": "transactions",
"user": "pathway",
"password": "my_password",
},
table_name="events_processed",
)