App Templates tutorialdata-pipeline
Uncovering hidden user relationships in crypto exchanges with Fuzzy Join on streaming data
Get updates on Upcoming App Templates and Blogs
Przemek UznańskiUncovering hidden user relationships in crypto exchanges with Fuzzy Join on streaming data
In this article, we want to analyze a stream of transactions in a crypto exchange. We find all the pairs of users A and B such as A sells B the ETH coin, and buys from B BTC in a separate transaction.
First we import Pathway and load the two transactions logs.
import pathway as pw
transactions = pw.io.kafka.read(
rdkafka_settings={
"group.id": "$GROUP_NAME",
"bootstrap.servers": "clean-panther-8776-eu1-kafka.upstash.io:9092",
"session.timeout.ms": "6000",
},
topics=["eth_transactions"],
)
eth_transactions = transactions.filter(pw.this.currency == "ETH")
btc_transactions = transactions.filter(pw.this.currency == "BTC")
Now we just need to find all the pairs of buyers/sellers in both transactions and use our fuzzy_match_tables to extract the matching pairs.
# ETH movements one way
eth_movement_totals = eth_transactions.groupby(pw.this.sender, pw.this.receiver).reduce(
user_A=pw.this.sender,
user_B=pw.this.receiver,
usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),
)
# BTC movements the other way
btc_movement_totals = btc_transactions.groupby(pw.this.sender, pw.this.receiver).reduce(
user_A=pw.this.receiver,
user_B=pw.this.sender,
usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),
)
# We run fuzzy join on the two aggregate transaction tables.
# We project users into different spaces to avoid to
# catch a user sending both ETH and BTC to the same user.
left_projection = {"user_A": "C1", "user_B": "C2"}
right_projection = {"user_A": "C1", "user_B": "C2"}
matches = pw.ml.smart_table_ops.fuzzy_match_tables(
eth_movement_totals,
btc_movement_totals,
left_projection=left_projection,
right_projection=right_projection,
)
# The matched user pairs are now output to an output postgres table.
matched_users = matches.select(
btc_sender=btc_movement_totals.ix(matches.right).user_B,
btc_receiver=btc_movement_totals.ix(matches.right).user_A,
eth_sender=eth_movement_totals.ix(matches.left).user_A,
eth_receiver=eth_movement_totals.ix(matches.left).user_B,
confidence=matches.weight,
)
We can now store the resulting table in postgres, or any other database supported by Pathway.
pw.io.postgres.write(
matched_users,
postgres_settings={
"host": "localhost",
"port": "5432",
"dbname": "transactions",
"user": "pathway",
"password": "my_password",
},
table_name="matched_users_btc_eth_swapping",
)
Would you like to find users that match within given time-window? Take a look at recipes on group-by with a tumbling window.
Power your RAG and ETL pipelines with Live Data
Related Articles
Pathway Team
blog · tutorialJun 16, 2023Tutorial: Distributed computing with live streaming data
Data Engineering Weekly
tutorial · engineeringMay 9, 2023Unlocking data stream processing [Part 3] - data enrichment with fuzzy joins
Saksham Goel
tutorial · engineeringMay 30, 2025Real-Time Multimodal Data Processing with Pathway and Docling
