Artificial Data Streams with the demo
Module
With Pathway's demo
module, you can create custom data streams from scratch or by utilizing a CSV file.
This feature empowers you to effectively test and debug your Pathway implementation using realtime data.
Pathway offers a seamless transition from static to streaming data.
However, obtaining a real data stream for testing your application can be challenging in specific scenarios.
This is where the demo
module comes in handy.
With the demo
module, you can generate your own artificial data streams.
With a collection of functions that simplify creating custom data streams, this module allows you to test and experiment with realtime data streams.
This article will teach you how to generate data streams with the demo
module.
Using real data streams is challenging
While it is not advisable to test directly in a production environment, accessing the same data as in production can be troublesome.
Let's consider an example where you are developing an IoT health monitoring system ⛑️.
You need to analyze data from various health sensors a patient wears, such as a glucose monitor 📈 or a pulse sensor 🫀.
Sharing this sensitive data poses privacy concerns, and merely having a snapshot of the data wouldn't suffice.
You would need to test your system in a live environment to truly evaluate it.
However, conducting a full-scale test with real volunteers wearing all the sensors and sharing their data and time for several hours would be costly and impractical.
In general, accessing realtime data streams during the development phase can be challenging due to the following reasons:
- Data Availability: Data sources might require special permissions, API integrations, or data-sharing agreements, making it challenging to obtain the necessary data for debugging.
- Data Privacy and Security: Data might contain sensitive or private information that needs to be handled carefully. Data privacy regulations and security concerns may restrict the availability of realtime data for debugging purposes.
- Production Data Constraints: Streaming applications often process large volumes of realtime data in production environments. Directly accessing and replicating such data for local debugging purposes can be resource-intensive and impractical. The scale and complexity of realtime data streams may require specialized infrastructure and tools that are not feasible to replicate in a local debugging setup.
- Data Consistency: Realtime data streams continuously evolve, making reproducing specific debugging scenarios challenging. In a debugging environment, it is crucial to have consistent and reproducible data to identify and fix issues effectively. realtime data streams can introduce variability, making it difficult to isolate specific events or situations for debugging purposes.
- Testing Environment Constraints: Debugging streaming applications often requires a controlled testing environment. Multiple components and dependencies interact in a production setting to generate realtime data. Isolating and replicating these dependencies in a testing environment while maintaining the fidelity of the data can be complex and time-consuming.
- realtime Dependencies: Streaming applications rely on external systems and services for data ingestion, processing, and storage. Debugging such applications may involve interactions with these external dependencies, further complicating the debugging process. Coordinating and synchronizing the availability of these dependencies for debugging purposes can be challenging.
Given these challenges, having the ability to generate artificial data streams using the demo
module becomes invaluable.
By generating your own data streams, you can define a controlled and reproducible environment for testing and debugging.
This allows you to iterate quickly, identify issues, and refine your code without relying on realtime external data sources.
The demo
module
The Demo module offers several functions that facilitate the generation of artificial data streams. These functions are valuable tools when real data streams are not readily available. Let's take a closer look at the functions provided by the Demo module:
range_stream
: The hello world of data streams. This function generates a simple data stream with a single column, 'value', where the values range from a specified offset to the number of rows plus the offset. This function is useful when you need a straightforward data stream to verify your application is running.noisy_linear_stream
: This function generates a data stream with two columns, 'x' and 'y', where the 'x' values range from 0 to a specified number of rows, and the 'y' values are calculated based on 'x' with added random noise. This function was made to experiment with linear regression algorithms.generate_custom_stream
: This function allows you to create a custom data stream by specifying value generators for each column. You can define the schema of the resulting table and choose the number of rows to generate. Additionally, you can control the input rate of the stream. This function is beneficial when you want to simulate a specific data stream for testing or demonstration purposes.replay_csv
: This function allows you to replay a static CSV file as a data stream. You simply have to specify the path to the file and the schema of the resulting table. This function is beneficial when you have a static CSV file and want to treat it as a data stream for testing or analysis purposes.replay_csv_with_time
: Similarly toreplay_csv
, this function allows you to replay a static CSV file as a data stream. The time of the updates is based on the timestamps contained in a given column of the CSV file. The timestamps should be sorted.
Examples
Generating a single-column data stream with range_stream
This function generates a simple data stream with a single column, 'value', whose values range from 0 to nb_rows
:
import pathway as pw
table = pw.demo.range_stream(nb_rows=50)
value
0
1
2
3
...
It can be used to verify that your application is responsive. You can write the table with a CSV output connector to check the stream is generated correctly.
This function is named after the sum example of the first realtime app guide:
import pathway as pw
table = pw.demo.range_stream(nb_rows=50)
table = table.reduce(sum=pw.reducers.sum(pw.this.value))
sum
0
1
3
6
...
You can specify offset to change the starting value:
import pathway as pw
table = pw.demo.range_stream(nb_rows=50, offset=10)
value
10
11
12
13
...
If you set nb_rows
to None
, the stream will be generated indefinitely.
You can also set the input rate with the input_rate
parameter, which defines the number of insertions per second. It defaults to one.
Linear regression with noisy_linear_stream
This function generates an artificial data stream ideal for linear regression tutorials. This function generates a data stream with two columns, 'x' and 'y', where the 'x' values range from 0 to a specified number of rows, and the 'y' values are calculated based on 'x' with added random noise:
import pathway as pw
table = pw.demo.noisy_linear_stream(nb_rows=100)
x,y
0,0.06888437030500963
1,1.0515908805880605
2,1.984114316166169
3,2.9517833500585926
4,4.002254944273722
5,4.980986827490083
6,6.056759717806955
7,6.9606625452157855
8,7.995319390830471
9,9.016676407891007
...
This function generates the data stream for our linear regression from Kafka example.
As with range_stream
, you can also set the input rate with the input_rate
parameter, which defines the number of insertions per second. It defaults to one.
Custom data streams with generate_custom_stream
generate_custom_stream
is a generalization of range_stream
and noisy_linear_stream
.
It generates rows with an index ranging from 0 to nb_rows
.
The content of the table is determined by a dictionary value_functions
mapping column names to a value generator: for each row, and its associated index , the value of the column col
is value_functions[col](i)
.
The schema should also be provided.
import pathway as pw
value_functions = {
'number': lambda x: x + 1,
'name': lambda x: f'Person {x}',
'age': lambda x: 20 + x,
}
class InputSchema(pw.Schema):
number: int
name: str
age: int
table = pw.demo.generate_custom_stream(value_functions, schema=InputSchema, nb_rows=10)
The generate_custom_stream
function creates a data stream with 10 rows in this example.
The stream has three columns: 'number', 'name', and 'age'.
The 'number' column values are the index of the row incremented by 1, the 'name' column contains formatted names with corresponding row indices, and the 'age' column contains values starting from 20 incremented by the row index:
number,name,age
1,"Person 0",20
2,"Person 1",21
3,"Person 2",22
...
If you set nb_rows
to None
, the stream will be generated indefinitely.
You can also set the autocommit_duration_ms
, which defines the maximum time between two commits.
Finally, you can select the input rate with the input_rate
parameter, specifying the number of insertions per second. It defaults to one.
Replaying static CSV files with replay_csv
and replay_csv_with_time
These functions let you replay a static CSV file as a data stream. You can specify the path to the file, choose which columns to extract, and define the schema of the resulting table. This function is beneficial when you have a static CSV file and want to treat it as a data stream for testing or analysis purposes.
import pathway as pw
class InputSchema(pw.Schema):
column1: str
column2: int
table = pw.demo.replay_csv(path='data.csv', schema=InputSchema, input_rate=1.5)
In this example, we use the replay_csv
function to replay a static CSV file data.csv
as a data stream.
We specify the path to the CSV file and the schema for the resulting table.
The data stream is generated with a specified input rate of 1.5 rows per second.
If you already have timestamps in your file, you can replay the file while respecting the time between the updates by using replay_csv_with_time
.
You simply need to specify the column in which the timestamps are stored with time_column
and the unit.
Only seconds, milliseconds, microseconds and nanoseconds are supported.
The file will be replayed using the first row as starting point, then the file will be replayed and the updates between each row will be based on the timestamps in time_column
.
Note that the timestamps should be ordered.
table = pw.demo.replay_csv_with_time(path='data.csv', schema=InputSchema, time_column='column2', unit='ms')
Conclusion
Obtaining real data streams can be challenging, particularly for debugging purposes.
Fortunately, Pathway's demo
module comes to the rescue, offering a seamless solution to create custom data streams from scratch of from existing CSV files.
You can now test and debug your Pathway implementation with real-time data, providing you with invaluable insights and boosting your application's performance.