Quick Overview of Pathway Live Data Framework
This page is a summary of what you need to know about to start programming with the Pathway Live Data Framework. If you want to learn more about the core concepts of the framework, you can read the dedicated article.
Import Pathway
You can quickly install Pathway with a simple pip command: pip install pathway
Then, you simply have to import Pathway as any other Python library:
import pathway as pw
Define your Data Schema:
Schemas in Pathway define the structure of your data tables. They describe the data types and names of the columns, ensuring that your data is well-organized and consistent.
For instance, when reading a data source, you specify a schema to map the incoming data:
class InputSchema(pw.Schema):
colA: int
colB: float
colC: str
Here, the InputSchema specifies three columns: colA (an integer), colB (a float) and colC (a string). Schemas define the structure of the data, ensuring type safety and optimizing runtime performance.
The Pathway Live Data Framework supports the following basic data types: bool, str, bytes, int, and float.
The framework also supports more complex data types, such as the Optional data type or temporal data types (datetime.datetime).
Tables:
Tables are Pathway objects that can actually store the data. These are composed of columns, each of which keeps data of the same type, just like in relational databases.
Connectors:
In the Pathway Live Data Framework, you need to use a connector to create a table from a data source. Connectors read and ingest, in real-time, data from your chosen data sources.
Here's an example of a connector that uses InputSchema to read CSV files from the ./data/ directory and outputs a table:
input_table = pw.io.csv.read('./data/', schema=InputSchema)
Here is a small sample of Pathway Live Data Framework input connectors:
| Input Connectors | Example |
|---|---|
| CSV connector | pw.io.csv.read('./data/', schema=InputSchema) |
| Kafka connector | pw.io.kafka.read(rdkafka_settings, topic="example", schema=InputSchema, format="csv") |
| SQLite connector | pw.io.sqlite.read('./data_path/', table_name, schema=InputSchema) |
| Google Drive connector | pw.io.gdrive.read(object_id='***', service_user_credentials_file="credentials.json") |
The Pathway Live Data Framework comes with many more connectors, including an Airbyte connector that allows you to connect to 300+ sources. You can find the list of available connectors on our connector page.
Transformations
Once your input data is specified, you can now define your data pipeline using Pathway Live Data Framework transformations. These, under the hood, are written in Rust meaning that they are very efficient.
Here is an example of a simple transformation composed of filtering and summing by groups:
filtered_table = input_table.filter(input_table.colA > 0)
result_table = (
filtered_table
.groupby(filtered_table.colB)
.reduce(sum_val=pw.Reducers.sum(pw.this.colC))
)
Here is a small sample of the operations you can do in the Pathway Live Data Framework:
| Category | Operations | Example |
|---|---|---|
| Arithmetic operations | +, -, *, /, //, %, ** | t.select(new_col = t.colA + t.colB) |
| Comparison operations | ==, !=, <, <=, >, >= | t.select(new_col = t.colA <= t.colB) |
| Boolean operations | & (AND), | (OR), ~ (NOT), ^ (XOR) | t.select(new_col = t.colA & (t.colB < 3)) |
| Filtering | filter | t.filter(pw.this.column > value) |
| Applying a function | pw.apply | t.select(new_col=pw.apply(func, pw.this.colA)) |
| Performing a SQL command | pw.sql | pw.sql(query, tab=t) |
The Pathway Live Data Framework comes with more advanced transformations such as Group-by and Aggregation and Join Operations. You can find the list of basic operations in our guide.
Temporal transformations
As a data stream processing framework, Pathway Live Data Framework also provides temporal operations:
| Category | Operations | Example |
|---|---|---|
| Windowing operations | windowby (sliding, tumbling, session) reduce | t.windowby(t.time, window=pw.temporal.tumbling(duration=...),...).reduce(...) |
| ASOF now join | asof_now_join | t1.asof_now_join(t2, t1.t, t2.t, t1.name==t2.name, how=..., direction=...).select(...) |
| Interval join | interval_join (outer, left, right) | t1.interval_join(t2, t1.t, t2.t, pw.temporal.interval(...), t1.col==t2.col).select(...) |
| Window join | interval_join (outer, left, right) | t1.window_join(t2, t1.t, t2.t, pw.temporal.sliding(...), t1.col==t2.col).select(...) |
The behavior of temporal operations determines the tradeoff between accuracy, latency, and memory consumption. You can control the behavior of temporal operations to adapt the tradeoff to your application.
Configure the output
Now that your data ingestion and processing are ready, you need to define what to do with the results. The Pathway Live Data Framework provides output connectors to send the data out of the framework. For example, you can send the results to a CSV file:
pw.io.csv.write(result_table, './output/')
Here are some output connectors available in Pathway Live Data Framework:
| Output Connectors | Example |
|---|---|
| CSV connector | pw.io.csv.write(table, './output/') |
| Kafka output connector | pw.io.kafka.write(table, rdkafka_settings, topic_name="example", format="json") |
| PostgreSQL connector | pw.io.postgres.write(table, output_postgres_settings, "sum_table") |
| Google PubSub connector | pw.io.pubsub.write(table, publisher, project_id, topic_id) |
You can find the list of available connectors on our connector page.
Running the pipeline
Once your pipeline is ready, with both connectors and transformations, you can run the computation with the command run:
pw.run()
The Pathway Live Data Framework listens to the data sources for new updates until the process is terminated: the computation runs forever until the process gets killed. This is the normal behavior of the framework.
LLM tooling
The Pathway Live Data Framework comes with an LLM xpack that provides you all the tools you need to use Large Language Models in the framework. If you are interested, you can learn more here.
Going further
Try our starting examples or learn more about the core concepts of the framework.