Setting up AsyncTransformer cache in Pathway

Some functions are expensive. For example, they may take a long to be computed or may access some APIs where you need to pay per usage. It can be a good idea to cache the calls of these functions.

In Pathway, it is possible by using AsyncTransformer, which has caching enabled.

Getting started

First of all, let's install Pathway:

%%capture --no-display
!pip install pathway

Then, let's go ahead and settle on the task that needs to be solved. Suppose that you do some analytics on the user dataset.

In this dataset, you have user emails. You'd like to check if these emails are disposable, meaning they are reliable for communication. You'll use public tools to do this and create a table with a simple boolean flag for each email.

For the demonstration, let's use the publicly available API of Kickbox.

Input preparation

Let's create the dataset. To keep things simple, you will store it on your computer in a file named users.csv. In a real production scenario, the source could be different — precisely, the data might be read from a Postgres table using CDC and Debezium connector.

Also, for simplicity, there will be just two columns: the user's ID and their email, while in real life, the table might contain more columns.

%%writefile users.csv
user_id,email
sergey,sergey@pathway.com
jack,jack@guerillamail.com
steven,steven@gmail.com
alice,alice@mailinator.com
rachel,rachel@yahoo.com
anna,anna@wordpress.com
Overwriting users.csv

Let's also clean any cache if it existed before. Remove the local directory ./Cache because you will further use it to store the cached data.

!rm -rf ./Cache

Now, let's write the code that checks the addresses and forms the table containing the flag if the email is disposable. You can do it using a class inherited from pw.AsyncTransformer, where you can also store the counter for the number of API requests made.

import requests
import sys
import pathway as pw

from typing import Any


class VerifiedEmailSchema(pw.Schema):
    user_id: str
    email: str
    is_email_disposable: bool


class EmailCheckTransformer(pw.AsyncTransformer, output_schema=VerifiedEmailSchema):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.api_requests_made = 0

    async def invoke(self, user_id: str, email: str) -> dict[str, Any]:
        print(f"An API call has been made for the user {user_id}", file=sys.stderr)
        self.api_requests_made += 1
        result = requests.get(
            f"https://open.kickbox.com/v1/disposable/{email}",
            timeout=1,
        )

        is_email_disposable = None
        try:
            result.raise_for_status()
            is_email_disposable = result.json()["disposable"]
        except Exception as e:
            print(f"Failed to perform API request: {e}", file=sys.stderr)

        return {
            "user_id": user_id,
            "email": email,
            "is_email_disposable": is_email_disposable,
        }

Now, let's implement the Pathway program that reads the data, applies the transformation, and outputs it so that you can check the result.

The cache is configured with the persistence config. It can be enabled by specifying the persistence mode parameter as pw.PersistenceMode.PERSISTING or pw.PersistenceMode.UDF_CACHING. In the first case, Pathway will also persist in the internal state. In the second, you will only have the cache for the AsyncTransformer invocations, which is computationally cheaper.

class UsersSchema(pw.Schema):
    user_id: str
    email: str


users = pw.io.csv.read("users.csv", schema=UsersSchema, mode="static")
transformer = EmailCheckTransformer(input_table=users)
emails_verified = transformer.result
pw.io.csv.write(emails_verified, "emails_verified.csv")

persistence_backend = pw.persistence.Backend.filesystem("./Cache")
persistence_config = pw.persistence.Config.simple_config(
    persistence_backend,
    persistence_mode=pw.PersistenceMode.UDF_CACHING,
)

pw.run(
    monitoring_level=pw.MonitoringLevel.NONE,
    persistence_config=persistence_config,
)
WARNING:pathway_engine.connectors.monitoring:CsvFilesystemReader-0: Closing the data source
An API call has been made for the user jack
An API call has been made for the user alice
An API call has been made for the user steven
An API call has been made for the user rachel
An API call has been made for the user anna
An API call has been made for the user sergey
WARNING:pathway_engine.connectors.monitoring:PythonReader-1: Closing the data source

Let's check the output now:

!cat emails_verified.csv
user_id,email,is_email_disposable,time,diff
"jack","jack@guerillamail.com",True,1701266683974,1
"alice","alice@mailinator.com",True,1701266684174,1
"steven","steven@gmail.com",False,1701266684374,1
"rachel","rachel@yahoo.com",False,1701266684574,1
"anna","anna@wordpress.com",False,1701266684774,1
"sergey","sergey@pathway.com",False,1701266684974,1

As you can see now, two emails have been recognized as disposable. Let's check how many API calls have been done.

transformer.api_requests_made
6

The number of calls is 6, as expected: six emails, each requiring an API call. Now, let's rerun the program and see how the number of calls changes.

Let's clear the graph so that the program starts fresh.

from pathway.internals.parse_graph import G
G.clear()

Now, let's run the program. To do so, take the same pipeline that reads from the same input and outputs to the same output.

users = pw.io.csv.read("users.csv", schema=UsersSchema, mode="static")
transformer = EmailCheckTransformer(input_table=users)
emails_verified = transformer.result
pw.io.csv.write(emails_verified, "emails_verified.csv")

persistence_backend = pw.persistence.Backend.filesystem("./Cache")
persistence_config = pw.persistence.Config.simple_config(
    persistence_backend,
    persistence_mode=pw.PersistenceMode.UDF_CACHING,
)

pw.run(
    monitoring_level=pw.MonitoringLevel.NONE,
    persistence_config=persistence_config,
)
WARNING:pathway_engine.connectors.monitoring:CsvFilesystemReader-0: Closing the data source
WARNING:pathway_engine.connectors.monitoring:PythonReader-1: Closing the data source

Now, you can check that the output has been produced. To ensure it is indeed the new output, compare the times below and in the first run.

!cat emails_verified.csv
user_id,email,is_email_disposable,time,diff
"jack","jack@guerillamail.com",True,1701266685306,1
"alice","alice@mailinator.com",True,1701266685306,1
"steven","steven@gmail.com",False,1701266685306,1
"rachel","rachel@yahoo.com",False,1701266685306,1
"anna","anna@wordpress.com",False,1701266685306,1
"sergey","sergey@pathway.com",False,1701266685306,1

As you can see, the output is correct: there are still two disposable email addresses and four regular ones. However, there are no requests to the API. Let's check the counter to ensure that no requests have been made.

transformer.api_requests_made
0

Now, let's change the data and see how the program behaves. Let's create the dataset where the three users are new, and the other three belong to the previous dataset.

%%writefile users.csv
user_id,email
sergey,sergey@pathway.com
steven,steven@gmail.com
rachel,rachel@yahoo.com
john,john@fakemail.fr
diana,diana@mail.com
alex,alex@gmail.com
Overwriting users.csv

Now, let's run the pipeline with cache and see what happens:

G.clear()
users = pw.io.csv.read("users.csv", schema=UsersSchema, mode="static")
transformer = EmailCheckTransformer(input_table=users)
emails_verified = transformer.result
pw.io.csv.write(emails_verified, "emails_verified.csv")

persistence_backend = pw.persistence.Backend.filesystem("./Cache")
persistence_config = pw.persistence.Config.simple_config(
    persistence_backend,
    persistence_mode=pw.PersistenceMode.UDF_CACHING,
)

pw.run(
    monitoring_level=pw.MonitoringLevel.NONE,
    persistence_config=persistence_config,
)
WARNING:pathway_engine.connectors.monitoring:CsvFilesystemReader-0: Closing the data source
An API call has been made for the user john
An API call has been made for the user alex
An API call has been made for the user diana
WARNING:pathway_engine.connectors.monitoring:PythonReader-1: Closing the data source

As you can see, three lines correspond to the new rows in this table. Each of these rows adds a single request that needs to be made. You can double-check with the counter in the transformer class:

transformer.api_requests_made
3

Finally, you can ensure that the output is indeed correct:

!cat emails_verified.csv
user_id,email,is_email_disposable,time,diff
"john","john@fakemail.fr",True,1701266685810,1
"alex","alex@gmail.com",False,1701266685912,1
"rachel","rachel@yahoo.com",False,1701266685912,1
"steven","steven@gmail.com",False,1701266685912,1
"sergey","sergey@pathway.com",False,1701266685912,1
"diana","diana@mail.com",False,1701266686110,1

Conclusion

You have learned how to use the cache for heavy or costly functions here. Walking step-by-step, you need to perform the following steps:

  • Define the logic in AsyncTransformer;
  • Define the cache by specifying the persistence mode parameter to UDF_CACHING;
  • Complete your data pipeline and just run the program!
persistenceoptimizationasync_transformercaching