Detecting suspicious user activity with Tumbling Window group-by

Our task is to detect suspicious user login attempts during some period of time. The main ingredient used is grouping over a tumbling window.

We have an input data table with following columns:

  • username,
  • whether the login was successful,
  • time of a login attempt,
  • ip_address of a login.

First we ingest the data.

import datetimeimport pathway as pwlogins = pw.csv.read("logins.csv")

We then filter attempts and keep only successful ones.

processed = logins.filter(logins.successful == False)

We now group remaining attempts by login time and ip_address (ignoring seconds in time of login).

by_minutes = processed.select(    processed.ip_address,    time=pw.apply(pw.utils.bucketing.truncate_to_minutes, processed.time),)grouped_by_minutes = by_minutes.groupby(by_minutes.time, by_minutes.ip_address)

Next step is to count the logins...

logins_counted = grouped_by_minutes.reduce(    by_minutes.time, by_minutes.ip_address, count=pw.reducers.count(by_minutes.id))

...and to keep only incidents where number of failed logins exceeded the threshold.

suspicious_logins = logins_counted.filter(logins_counted.count >= 5)pw.csv.write(suspicious_logins, "suspicious_logins.csv")

Przemek Uznański

Data Structures and Optimization Expert