Detecting suspicious user activity with Tumbling Window group-by

Our task is to detect suspicious user login attempts during some period of time. The main ingredient used is grouping over a tumbling window.

We have an input data table with following columns:

  • username,
  • whether the login was successful,
  • time of a login attempt,
  • ip_address of a login.

First we ingest the data.

# Uncomment to download the required files.# %%capture --no-display# !wget -O logins2.csv
from datetime import datetimeimport pathway as pwlogins ="logins.csv", value_columns=["username", "successful", "time", "ip_address"])
logins =    *pw.this.without(pw.this.successful),    successful=(pw.this.successful=="True"))
logins =    *pw.this.without(pw.this.successful),    successful=pw.cast(bool, pw.this.successful),)

We then filter attempts and keep only the unsuccessful ones.

processed = logins.filter(~pw.this.successful)

We now group remaining attempts by login time and ip_address (ignoring seconds in time of login).

by_minutes =    pw.this.ip_address,    time=pw.apply(        lambda timestamp_str: (datetime.fromtimestamp(int(timestamp_str)).isoformat())[:-2]+"00",        pw.this.time))grouped_by_minutes = by_minutes.groupby(pw.this.time, pw.this.ip_address)

Next step is to count the logins...

logins_counted = grouped_by_minutes.reduce(    by_minutes.time, by_minutes.ip_address, count=pw.reducers.count(

...and to keep only incidents where number of failed logins exceeded the threshold.

suspicious_logins = logins_counted.filter(pw.this.count >= 5)pw.debug.compute_and_print(suspicious_logins)
            | time                | ip_address    | count
^DKEYHS4... | 2018-12-25T10:30:00 | | 7

Przemek Uznański

Data Structures and Optimization Expert