Mining hidden user pair activity with Fuzzy Join

In this article, we want to analyze a stream of transactions in a crypto exchange. We find all the pairs of users A and B such as A sells B the ETH coin, and buys from B BTC in a separate transaction.

First we import Pathway and load the two transactions logs.

import pathway as pwtransactions =    rdkafka_settings={        "": "$GROUP_NAME",        "bootstrap.servers": "",        "": "6000",    },    topics=["eth_transactions"],)eth_transactions = transactions.filter(pw.this.currency == "ETH")btc_transactions = transactions.filter(pw.this.currency == "BTC")

Now we just need to find all the pairs of buyers/sellers in both transactions and use our fuzzy_match_tables to extract the matching pairs.

# ETH movements one wayeth_movement_totals = eth_transactions.groupby(    pw.this.sender, pw.this.receiver).reduce(    user_A=pw.this.sender,    user_B=pw.this.receiver,    usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),)# BTC movements the other waybtc_movement_totals = btc_transactions.groupby(    pw.this.sender, pw.this.receiver).reduce(    user_A=pw.this.receiver,    user_B=pw.this.sender,    usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),)# We run fuzzy join on the two aggregate transaction tables.# We project users into different spaces to avoid to# catch a user sending both ETH and BTC to the same user.left_projection = {"user_A": "C1", "user_B": "C2"}right_projection = {"user_A": "C1", "user_B": "C2"}matches =    eth_movement_totals, btc_movement_totals,    left_projection=left_projection,    right_projection=right_projection,)# The matched user pairs are now output to an output postgres table.matched_users =    btc_sender=btc_movement_totals.ix[matches.right].user_B,    btc_receiver=btc_movement_totals.ix[matches.right].user_A,    eth_sender=eth_movement_totals.ix[matches.left].user_A,    eth_receiver=eth_movement_totals.ix[matches.left].user_B,    confidence=matches.weight,)

We can now store the resulting table in postgres, or any other database supported by Pathway.

pw.postgres.write(    matched_users,    postgres_settings={        "host": "localhost",        "port": "5432",        "dbname": "transactions",        "user": "pathway",        "password": "my_password",    },    table_name="matched_users_btc_eth_swapping",)

Would you like to find users that match within given time-window? Take a look at recipes on group-by with a tumbling window.

Przemek Uznański

Data Structures and Optimization Expert