November 2, 2022TUTORIAL · MACHINE-LEARNING

Run In Colab  View in Github

Detecting suspicious user activity with Tumbling Window group-by

Your task is to detect suspicious user login attempts during some period of time. The main ingredient used is grouping over a tumbling window.

You have an input data table with following columns:

  • username,
  • whether the login was successful,
  • time of a login attempt,
  • ip_address of a login.

First ingest the data.

# Uncomment to download the required files.
# %%capture --no-display
# !wget https://public-pathway-releases.s3.eu-central-1.amazonaws.com/data/suspicious_users_tutorial_logins.csv -O logins.csv
from datetime import datetime
import pathway as pw
logins = pw.io.csv.read(
"logins.csv",
value_columns=["username", "successful", "time", "ip_address"],
mode="static",
)
logins = logins.update_columns(successful=(pw.this.successful == "True"))

Then filter attempts and keep only the unsuccessful ones.

processed = logins.filter(~pw.this.successful)

Now, group remaining attempts by login time and ip_address (ignoring seconds in time of login).

by_minutes = processed.select(
pw.this.ip_address,
time=pw.apply(
lambda timestamp_str: (datetime.fromtimestamp(int(timestamp_str)).isoformat())[
:-2
]
+ "00",
pw.this.time,
),
)
grouped_by_minutes = by_minutes.groupby(pw.this.time, pw.this.ip_address)

The next step is to count the logins...

logins_counted = grouped_by_minutes.reduce(
by_minutes.time, by_minutes.ip_address, count=pw.reducers.count(by_minutes.id)
)

...and to keep only incidents where the number of failed logins exceeded the threshold.

suspicious_logins = logins_counted.filter(pw.this.count >= 5)
pw.debug.compute_and_print(suspicious_logins)
| time | ip_address | count ^QNS6146... | 2018-12-25T10:30:00 | 50.37.169.241 | 7

Przemek Uznański

Data Structures and Optimization Expert