Detecting suspicious user activity with Tumbling Window group-by

Our task is to detect suspicious user login attempts during some period of time. The main ingredient used is grouping over a tumbling window.

We have an input data table with following columns:

  • username,
  • whether the login was successful,
  • time of a login attempt,
  • ip_address of a login.

First we ingest the data.

# Uncomment to download the required files.# %%capture --no-display# !wget https://public-pathway-releases.s3.eu-central-1.amazonaws.com/data/suspicious_users_tutorial_logins.csv -O logins2.csv
from datetime import datetimeimport pathway as pwlogins = pw.csv.read("logins.csv", value_columns=["username", "successful", "time", "ip_address"])
logins = logins.select(    *pw.this.without(pw.this.successful),    successful=(pw.this.successful=="True"))
logins = logins.select(    *pw.this.without(pw.this.successful),    successful=pw.cast(bool, pw.this.successful),)

We then filter attempts and keep only the unsuccessful ones.

processed = logins.filter(~pw.this.successful)

We now group remaining attempts by login time and ip_address (ignoring seconds in time of login).

by_minutes = processed.select(    pw.this.ip_address,    time=pw.apply(        lambda timestamp_str: (datetime.fromtimestamp(int(timestamp_str)).isoformat())[:-2]+"00",        pw.this.time))grouped_by_minutes = by_minutes.groupby(pw.this.time, pw.this.ip_address)

Next step is to count the logins...

logins_counted = grouped_by_minutes.reduce(    by_minutes.time, by_minutes.ip_address, count=pw.reducers.count(by_minutes.id))

...and to keep only incidents where number of failed logins exceeded the threshold.

suspicious_logins = logins_counted.filter(pw.this.count >= 5)pw.debug.compute_and_print(suspicious_logins)
            | time                | ip_address    | count
^DKEYHS4... | 2018-12-25T10:30:00 | 50.37.169.241 | 7

Przemek Uznański

Data Structures and Optimization Expert