App Templates tutorialdata-pipeline

Uncovering hidden user relationships in crypto exchanges with Fuzzy Join on streaming data

Get updates on Upcoming App Templates and Blogs
Przemek Uznański avatarPrzemek Uznański
·Published January 9, 2023.Updated January 9, 2023·0 min read

In this article, we want to analyze a stream of transactions in a crypto exchange. We find all the pairs of users A and B such as A sells B the ETH coin, and buys from B BTC in a separate transaction.

First we import Pathway and load the two transactions logs.

import pathway as pw


transactions = pw.io.kafka.read(
    rdkafka_settings={
        "group.id": "$GROUP_NAME",
        "bootstrap.servers": "clean-panther-8776-eu1-kafka.upstash.io:9092",
        "session.timeout.ms": "6000",
    },
    topics=["eth_transactions"],
)

eth_transactions = transactions.filter(pw.this.currency == "ETH")
btc_transactions = transactions.filter(pw.this.currency == "BTC")

Now we just need to find all the pairs of buyers/sellers in both transactions and use our fuzzy_match_tables to extract the matching pairs.

# ETH movements one way
eth_movement_totals = eth_transactions.groupby(pw.this.sender, pw.this.receiver).reduce(
    user_A=pw.this.sender,
    user_B=pw.this.receiver,
    usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),
)
# BTC movements the other way
btc_movement_totals = btc_transactions.groupby(pw.this.sender, pw.this.receiver).reduce(
    user_A=pw.this.receiver,
    user_B=pw.this.sender,
    usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),
)
# We run fuzzy join on the two aggregate transaction tables.
# We project users into different spaces to avoid to
# catch a user sending both ETH and BTC to the same user.
left_projection = {"user_A": "C1", "user_B": "C2"}
right_projection = {"user_A": "C1", "user_B": "C2"}
matches = pw.ml.smart_table_ops.fuzzy_match_tables(
    eth_movement_totals,
    btc_movement_totals,
    left_projection=left_projection,
    right_projection=right_projection,
)

# The matched user pairs are now output to an output postgres table.
matched_users = matches.select(
    btc_sender=btc_movement_totals.ix(matches.right).user_B,
    btc_receiver=btc_movement_totals.ix(matches.right).user_A,
    eth_sender=eth_movement_totals.ix(matches.left).user_A,
    eth_receiver=eth_movement_totals.ix(matches.left).user_B,
    confidence=matches.weight,
)

We can now store the resulting table in postgres, or any other database supported by Pathway.

pw.io.postgres.write(
    matched_users,
    postgres_settings={
        "host": "localhost",
        "port": "5432",
        "dbname": "transactions",
        "user": "pathway",
        "password": "my_password",
    },
    table_name="matched_users_btc_eth_swapping",
)

Would you like to find users that match within given time-window? Take a look at recipes on group-by with a tumbling window.


Przemek Uznański

Data Structures and Optimization Expert

Power your RAG and ETL pipelines with Live Data

Get started for free