Running Pathway Program in AWS Cloud with Fargate
The Pathway framework enables you to define and run various data processing pipelines. You can find numerous tutorials that guide you through building systems like log monitoring, ETL pipelines with Kafka, or data preparation for Spark analytics.
Once you've developed and tested these pipelines locally, the next logical step is to deploy them in the cloud. Cloud deployment allows your code to run remotely, minimizing interruptions from local machine issues. This step is crucial for moving your code into a production-ready environment.
There are several ways to deploy your code to the cloud. You can deploy it on GCP or using Render, for example. In this tutorial, you will learn how to deploy your Pathway code on AWS Fargate using Pathway's tools and lightweight Cloud Marketplace options to simplify the process.
The tutorial is structured as follows:
- Description of the ETL example pipeline.
- Presentation of the Pathway CLI and the BYOL container.
- Step-by-step guide to setting up a deployment on AWS Fargate.
- Results verifications.
- Conclusions.
Before you continue, please ensure your project meets these basic requirements:
- The project is hosted on a public GitHub repository.
- The requirements.txt file in the root directory lists all the Python dependencies for the project.
ETL Example Pipeline
Let's take the "Data Preparation for Spark Analytics" tutorial as an example. This tutorial walks you through building an ETL process that tracks GitHub commit history, removes sensitive data, and loads the results into a Delta Lake. For a detailed explanation, you can refer to the article that covers this task in depth.
The tutorial's code is available in a Github repository. A few changes have been made to simplify the process:
- The GitHub PAT (Personal Access Token) can now be read from an environment variable.
- Spark computations have been removed since they aren't necessary in a cloud-based container.
Additionally, the README file has been updated to offer more guidance on using Pathway CLI tools to run the project.
There's an important point to consider regarding the task's output. Originally, there were two possible output modes: storing data in a locally-based Delta Lake or in an S3-based Delta Lake. In cloud deployment, using a locally-based Delta Lake isn't practical because it only exists within the container on a remote cloud worker and isn't accessible to the user. Therefore, this tutorial uses an S3-based Delta Lake to store the results, as it provides easy access afterward. This approach requires additional environment variables for the container to access the S3 service, which will be discussed further.
Pathway CLI and the BYOL container
Pathway CLI
Pathway provides several tools that simplify both cloud deployment and development in general.
The first tool is the Pathway CLI. When you install Pathway, it comes with a command-line tool that helps you launch Pathway programs. For example, the spawn
command lets you run code using multiple computational threads or processes. For example, pathway spawn python main.py
runs your locally hosted main.py
file using Pathway.
This tutorial highlights another feature: the ability to run code directly from a GitHub repository, even if it's not hosted locally.
Take the airbyte-to-deltalake
example mentioned earlier. You can run it from the command line by setting two environment variables: GITHUB_PERSONAL_ACCESS_TOKEN
for your GitHub PAT and PATHWAY_LICENSE_KEY
for your Pathway license key. Then, simply call pathway spawn
using --repository-url
to define the GitHub repository to run.
This approach allows you to run remotely hosted code as follows:
GITHUB_PERSONAL_ACCESS_TOKEN=YOUR_GITHUB_PERSONAL_ACCESS_TOKEN \
PATHWAY_LICENSE_KEY=YOUR_PATHWAY_LICENSE_KEY \
pathway spawn --repository-url https://github.com/pathway-labs/airbyte-to-deltalake python main.py
When the --repository-url
parameter is provided, the CLI automatically handles checking out the repository, installing any dependencies listed in the requirements.txt
file within an isolated environment, and running the specified file.
Additionally, you can use the PATHWAY_SPAWN_ARGS
environment variable as a shortcut for running pathway spawn. This allows you to run code from a GitHub repository with the following command:
GITHUB_PERSONAL_ACCESS_TOKEN=YOUR_GITHUB_PERSONAL_ACCESS_TOKEN \
PATHWAY_LICENSE_KEY=YOUR_PATHWAY_LICENSE_KEY \
PATHWAY_SPAWN_ARGS='--repository-url https://github.com/pathway-labs/airbyte-to-deltalake python main.py' \
pathway spawn-from-env
Pathway BYOL
Another useful resource from Pathway is the BYOL (Bring Your Own License) Container available on AWS Marketplace. This listing offers a ready-to-use Docker image with Pathway and all its dependencies pre-installed within the AWS ecosystem. You can use the container without a license key, but entering one unlocks the full features of the framework. The listing is free to use, so there's no cost associated with accessing it on the marketplace.
The container runs the pathway spawn-from-env
command, allowing you to easily execute it on the marketplace by passing the PATHWAY_SPAWN_ARGS
and other required environment variables into the container. This gets your code running in the cloud. The next section will guide you through setting up Pathway processes using AWS Fargate, the recommended AWS solution for the task.
Running the Example in AWS Fargate
First, you need to acquire the container from the AWS Fargate listing:
- Open the listing and click on the "Continue to Subscribe" button above.
- Review the Pathway BYOL container offer and access the Terms and Conditions for its distribution (BSL 1.1 License).
- Once you're familiar with the TOC, click "Continue to Configuration" again.
- Choose the fulfillment method as "Container image" for this tutorial, and select the software version (we recommend using the latest version available).
- Click "Continue to Launch."
Congratulations! You've now acquired the Pathway BYOL Docker image hosted on AWS infrastructure. This image is available for use in Amazon ECS and Amazon EKS services. You can find the image path in the CONTAINER_IMAGE
variable under the "Container images" section on the page.
Next, there are several steps to make use of this container: you need to log in, configure AWS Fargate, and run your Pathway instance.
You can do all the steps in a single launcher script (called launch.py
in our example) that you will run locally.
Step 1: Login Into AWS CLI
The AWS Command Line Interface (CLI) is a powerful tool for managing AWS services. If you haven't installed it yet, please do so here. Once installed, follow this simple procedure to log in using the CLI.
You will also need both the access key and the secret access key. You can find more details on how to obtain them here.
After logging in, you can import the boto3
library and create clients for ECS and EKS services. Keep in mind that the region name may vary depending on your setup.
import boto3
ecs_client = boto3.client("ecs", region_name="eu-central-1")
ecr_client = boto3.client("ecr", region_name="eu-central-1")
If the login is successful, you can also obtain the authentication token for the current session.
auth_token = ecr_client.get_authorization_token()
Step 2: Register Task Definition In AWS
In AWS (Amazon Web Services), task definitions are crucial for managing containerized applications with Amazon ECS (Elastic Container Service). A task definition serves as a blueprint that outlines how Docker containers should run. It includes details like container images, CPU and memory requirements, networking settings, and environment variables. This ensures that tasks are executed consistently and can be deployed repeatedly across different environments. Task definitions also allow integration with other AWS services, such as load balancers and auto-scaling, to efficiently manage and scale applications.
The next step is to create a task definition for the container you plan to run. In this case, the task definition is defined as a dictionary, and it can be represented as follows:
task_definition = {
"family": "pathway-container-test",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "2048",
"memory": "8192",
"executionRoleArn": "arn:aws:iam::YOUR_ACCOUNT_ID:role/ecsTaskExecutionRole",
"containerDefinitions": [
{
"name": "pathway-container-test-definition",
"image": "PATH_TO_YOUR_IMAGE",
"essential": True,
}
],
}
Here's a brief explanation to this configuration field-by-field:
family
: Specifies the name of the task definition family. It groups multiple versions of a task definition, allowing you to track changes and roll back to previous versions if needed. In this example,"pathway-container-test"
is the family name.networkMode
: Defines the networking mode to be used by the containers in the task."awsvpc"
assigns each task its own elastic network interface, allowing for better isolation and security.requiresCompatibilities
: Specifies the launch types that the task definition is compatible with."FARGATE"
indicates that the task will run using the AWS Fargate serverless compute engine, mentioned above, which eliminates the need for the additional settings.cpu
: Sets the number of CPU units allocated to the task. Here,"2048"
represents 2 vCPUs (1024 units = 1 vCPU).memory
: Allocates the amount of memory (in MiB) for the task."8192"
means the task is provided with 8 GiB of memory.executionRoleArn
: Provides the ARN (Amazon Resource Name) of the IAM role that grants the ECS service the permissions to pull container images and publish logs. In this case, the ARN can be obtained by placing your account ID in the given pattern.containerDefinitions
: Defines the containers that make up your task. It is an array of container definitions, where each container is specified with its settings. In this tutorial, the goal is to run a single container with the specified Github repository.name
: The name of the container, which is unique within the task. In this example,"pathway-container-test-definition"
is the container name.image
: Specifies the Docker image to be used for the container, including the image repository, path, and tag. Here, you can use the path you've got in theCONTAINER_IMAGE
variable. It points to Amazon ECR (Elastic Container Registry).essential
: A boolean value that marks the container as essential to the task. If an essential container stops or fails, all other containers in the task are stopped. Since there is only one container runs within the task, it should be made essential.
Now this task definition can be submitted.
response = ecs_client.register_task_definition(**task_definition)
The ECS client returns the ARN of the created task, which is its unique identifier. Be sure to save this ARN for use in the next steps.
task_definition_arn = response["taskDefinition"]["taskDefinitionArn"]
Step 3 (optional): Create a Cluster Where The Container Will Run
A cluster is a logical grouping of tasks or services that helps you manage and scale containerized applications. When using Fargate, a cluster acts as the environment where your tasks run, hiding the underlying infrastructure details. Clusters organize and isolate resources, allowing you to run multiple services or applications within the same AWS account while keeping them separate. They are crucial for defining network boundaries, associating IAM roles, and managing permissions across various services or environments within ECS.
If you already have an ECS cluster, you can use it to run your example. If not, you need to follow these simple steps to create one.
cluster_name = "pathway-test-cluster"
response = ecs_client.create_cluster(clusterName=cluster_name)
This command will create an ECS cluster with default settings, which are sufficient to run the tutorial.
Step 4: Launch The Container
Now that everything is set up, you're ready to run the task. As mentioned earlier, running the container with local output isn't ideal because the output will be stored inside the container and will be lost when the container is removed.
Instead, you should configure the environment variables to use S3 for storing the output in Delta Lake mode. Here's how to set it up.
s3_output_path = "YOUR_S3_OUTPUT_PATH" # Example: "s3://aws-demo/runs/16.08.2024/1/"
s3_bucket_name = "YOUR_S3_BUCKET_NAME" # Example: "aws-demo"
s3_region = "YOUR_S3_REGION" # Example: "eu-central-1"
s3_access_key = "YOUR_AWS_S3_ACCESS_KEY"
s3_secret_access_key = "YOUR_AWS_S3_SECRET_ACCESS_KEY"
environment_vars = [
{
"name": "AWS_S3_OUTPUT_PATH",
"value": s3_output_path,
},
{
"name": "AWS_S3_ACCESS_KEY",
"value": s3_access_key,
},
{
"name": "AWS_S3_SECRET_ACCESS_KEY",
"value": s3_secret_access_key,
},
{
"name": "AWS_BUCKET_NAME",
"value": s3_bucket_name,
},
{
"name": "AWS_REGION",
"value": s3_region,
},
{
"name": "PATHWAY_LICENSE_KEY",
"value": "YOUR_PATHWAY_LICENSE_KEY", # You can get it at https://pathway.com/features
},
{
"name": "GITHUB_PERSONAL_ACCESS_TOKEN",
"value": "YOUR_GITHUB_PERSONAL_ACCESS_TOKEN", # You can get it at https://github.com/settings/tokens
},
{
"name": "PATHWAY_SPAWN_ARGS",
"value": "--repository-url https://github.com/pathway-labs/airbyte-to-deltalake python main.py", # Doesn't need to be changed
},
]
Here are the essential environment variables you'll need:
PATHWAY_LICENSE_KEY
: Pathway License key is required for Delta Lake features to work. You can get a free license here.AWS_S3_OUTPUT_PATH
: The full path in S3 where the output will be stored.AWS_S3_ACCESS_KEY
: Your S3 access key.AWS_S3_SECRET_ACCESS_KEY
: Your S3 secret access key.AWS_BUCKET_NAME
: The name of your S3 bucket.AWS_REGION
: The region where your S3 bucket is located.GITHUB_PERSONAL_ACCESS_TOKEN
: Your GitHub Personal Access Token, which you can obtain from the "Personal access tokens" page.PATHWAY_SPAWN_ARGS
: Arguments for the Pathway CLI. For this example, it specifies that the scriptmain.py
from thepathway-labs/airbyte-to-deltalake
repository should be run.
Then you can launch the container in the cloud. It can be done with a simple run_task
command as follows:
task = {
"taskDefinition": task_definition_arn,
"cluster": cluster_name,
"launchType": "FARGATE",
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["REPLACE_WITH_YOUR_SUBNET_ID"],
"assignPublicIp": "ENABLED",
}
},
"count": 1,
"overrides": {
"containerOverrides": [
{
"name": "pathway-container-test-definition",
"environment": environment_vars,
}
]
}
}
ecs_client.run_task(**task)
Here's a parameter-by-parameter explanation of what's launched:
taskDefinition
: The ARN or family name of the task definition to use for this task. This is the definition that you've created at the step 2.cluster
: The name or ARN (Amazon Resource Name) of the ECS cluster where the task will be run. This is the cluster you've created at the step 3.launchType
: Specifies the launch type for the task."FARGATE"
indicates that the task should run on AWS Fargate.networkConfiguration
,awsvpcConfiguration
: Provides VPC (Virtual Private Cloud) settings for the task. Here is the closer look for them:subnets
: A list of subnet IDs within your VPC where the task will be launched. At least one subnet should be specified - you can use any from the Subnets page.assignPublicIp
:"ENABLED"
means that the task will be assigned a public IP address when launched, allowing it to be accessed from the internet.
count
: Specifies the number of task instances to run. There is only one instance of the task that needs to be spawned, hence the value is 1.overrides
,containerOverrides
: provides overridden environment variables for the cloud launch.
When you run this code, the task will be created and run. You can then observe the status of its' execution on your cluster's overview page. Eventually, the task finishes.
Accessing the Execution Results
After the execution is complete, you can verify that the results are in the S3-based Delta Lake using the delta-rs
Python package.
from deltalake import DeltaTable
storage_options = {
"AWS_ACCESS_KEY_ID": s3_access_key,
"AWS_SECRET_ACCESS_KEY": s3_secret_access_key,
"AWS_REGION": s3_region,
"AWS_BUCKET_NAME": s3_bucket_name,
# Disabling DynamoDB sync since there are no parallel writes into this Delta Lake
"AWS_S3_ALLOW_UNSAFE_RENAME": "True",
}
delta_table = DeltaTable(
s3_output_path,
storage_options=storage_options,
)
pd_table_from_delta = delta_table.to_pandas()
pd_table_from_delta.shape[0]
664
You can also verify the count: there were indeed 664 commits in the pathwaycom/pathway
repository as of the time this text was written.
Conclusions
Cloud deployment is a key part of developing advanced projects. It lets you deploy solutions that run reliably and predictably, while also allowing for flexible resource management, increased stability, and the ability to choose application availability zones.
However, it can be complex, especially for beginners who might face a system with containers, cloud services, virtual machines, and many other components.
This tutorial taught you how to simplify program deployment on AWS cloud using Pathway CLI and Pathway BYOL containers. You simply need to get a container with Pathway CLI from the AWS Marketplace, set the repository and launch parameters, and use Fargate.
Feel free to try it out and clone the example repository to develop your own data extraction solutions. We also welcome your feedback in our Discord community!