Your task is to detect suspicious user login attempts during some period of time. The main ingredient used is grouping over a tumbling window.
You have an input data table with following columns:
username
,successful
,time
of a login attempt,ip_address
of a login.First ingest the data.
# Uncomment to download the required files.# %%capture --no-display# !wget https://public-pathway-releases.s3.eu-central-1.amazonaws.com/data/suspicious_users_tutorial_logins.csv -O logins.csv
from datetime import datetimeimport pathway as pwlogins = pw.io.csv.read( "logins.csv", value_columns=["username", "successful", "time", "ip_address"], mode="static",)
logins = logins.update_columns(successful=(pw.this.successful == "True"))
Then filter attempts and keep only the unsuccessful ones.
processed = logins.filter(~pw.this.successful)
Now, group remaining attempts by login time
and ip_address
(ignoring seconds in time
of login).
by_minutes = processed.select( pw.this.ip_address, time=pw.apply( lambda timestamp_str: (datetime.fromtimestamp(int(timestamp_str)).isoformat())[ :-2 ] + "00", pw.this.time, ),)grouped_by_minutes = by_minutes.groupby(pw.this.time, pw.this.ip_address)
The next step is to count the logins...
logins_counted = grouped_by_minutes.reduce( by_minutes.time, by_minutes.ip_address, count=pw.reducers.count(by_minutes.id))
...and to keep only incidents where the number of failed logins exceeded the threshold.
suspicious_logins = logins_counted.filter(pw.this.count >= 5)pw.debug.compute_and_print(suspicious_logins)
| time | ip_address | count
^QNS6146... | 2018-12-25T10:30:00 | 50.37.169.241 | 7