Mining hidden user pair activity with Fuzzy Join

In this article, we want to analyze a stream of transactions in a crypto exchange. We find all the pairs of users A and B such as A sells B the ETH coin, and buys from B BTC in a separate transaction.

First we import Pathway and load the two transactions logs.

import pathway as pwtransactions = pw.kafka.read(    rdkafka_settings={        "group.id": "$GROUP_NAME",        "bootstrap.servers": "clean-panther-8776-eu1-kafka.upstash.io:9092",        "session.timeout.ms": "6000",    },    topics=["eth_transactions"],)eth_transactions = transactions.filter(transactions.currency == "ETH")btc_transactions = transactions.filter(transactions.currency == "BTC")

Now we just need to find all the pairs of buyers/sellers in both transactions and use our fuzzy_match_tables to extract the matching pairs.

# ETH movements one wayeth_movement_totals = eth_transactions.groupby(    eth_transactions.sender, eth_transactions.receiver).reduce(    user_A=eth_transactions.sender,    user_B=eth_transactions.receiver,    usd_total_estimate=pw.reducers.sum(eth_transactions.usd_estimate),)# BTC movements the other waybtc_movement_totals = btc_transactions.groupby(    btc_transactions.sender, btc_transactions.receiver).reduce(    user_A=btc_transactions.receiver,    user_B=btc_transactions.sender,    usd_total_estimate=pw.reducers.sum(btc_transactions.usd_estimate),)# We run fuzzy join on the two aggregate transaction tables.matches = pw.ml.smart_table_ops.fuzzy_match_tables(    eth_movement_totals, btc_movement_totals)# The matched user pairs are now output to an output postgres table.matched_users = matches.select(    btc_sender=btc_movement_totals.ix[matches.right].user_B,    btc_receiver=btc_movement_totals.ix[matches.right].user_A,    eth_sender=eth_movement_totals.ix[matches.left].user_A,    eth_receiver=eth_movement_totals.ix[matches.left].user_B,    confidence=matches.weight,)

We can now store the resulting table in postgres, or any other database supported by Pathway.

pw.postgres.write(    matched_users,    postgres_settings={        "host": "localhost",        "port": "5432",        "dbname": "transactions",        "user": "pathway",        "password": "my_password",    },    table_name="matched_users_btc_eth_swapping",)

Would you like to find users that match within given time-window? Take a look at recipes on group-by with a tumbling window.

Przemek Uznański

Data Structures and Optimization Expert