pw.demo
Pathway demo module
This module allows you to create custom data streams from scratch or by utilizing a CSV file. This feature empowers you to effectively test and debug your Pathway implementation using realtime data.
Typical use:
class InputSchema(pw.Schema):
name: str
age: int
pw.demo.replay_csv("./input_stream.csv", schema=InputSchema)
generate_custom_stream(value_generators, *, schema, nb_rows=None, autocommit_duration_ms=1000, input_rate=1.0, persistent_id=None)
sourceGenerates a data stream.
The generator creates a table and periodically streams rows.
If a nb_rows
value is provided, there are nb_rows
row generated in total,
else the generator streams indefinitely.
The rows are generated iteratively and have an associated index x, starting from 0.
The values of each column are generated by their associated function in value_generators
.
- Parameters
- value_generators (
dict
[str
,Any
]) – Dictionary mapping column names to functions that generate values for each column. - schema (
type
[Schema
]) – Schema of the resulting table. - nb_rows (
int
|None
) – The number of rows to generate. Defaults to None. If set to None, the generator generates streams indefinitely. - types – Dictionary containing the mapping between the columns and the data types (
pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. - autocommit_duration_ms (
int
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - input_rate (
float
) – The rate at which rows are generated per second. Defaults to 1.0.
- value_generators (
- Returns
Table – The generated table.
Example:
value_functions = {
'number': lambda x: x + 1,
'name': lambda x: f'Person {x}',
'age': lambda x: 20 + x,
}
class InputSchema(pw.Schema):
number: int
name: str
age: int
pw.demo.generate_custom_stream(value_functions, schema=InputSchema, nb_rows=10)
In the above example, a data stream is generated with 10 rows, where each row has columns ‘number’, ‘name’, and ‘age’. The ‘number’ column contains values incremented by 1 from 1 to 10, the ‘name’ column contains ‘Person’ followed by the respective row index, and the ‘age’ column contains values starting from 20 incremented by the row index.
noisy_linear_stream(nb_rows=10, input_rate=1.0)
sourceGenerates an artificial data stream for the linear regression tutorial.
- Parameters
- nb_rows (
int, optional
) – The number of rows to generate in the data stream. Defaults to 10. - input_rate (
float, optional
) – The rate at which rows are generated per second. Defaults to 1.0.
- nb_rows (
- Returns
pw.Table – A table containing the generated data stream.
Example:
table = pw.demo.noisy_linear_stream(nb_rows=100, input_rate=2.0)
In the above example, an artificial data stream is generated with 100 rows. Each row has two columns, ‘x’ and ‘y’. The ‘x’ values range from 0 to 99, and the ‘y’ values are equal to ‘x’ plus some random noise.
range_stream(nb_rows=30, offset=0, input_rate=1.0, autocommit_duration_ms=1000)
sourceGenerates a simple artificial data stream, used to compute the sum in our examples.
- Parameters
- nb_rows (
int, optional
) – The number of rows to generate in the data stream. Defaults to 30. - offset (
int, optional
) – The offset value added to the generated ‘value’ column. Defaults to 0. - input_rate (
float, optional
) – The rate at which rows are generated per second. Defaults to 1.0. - autocommit_duration_ms (
int
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
- nb_rows (
- Returns
pw.Table – a table containing the generated data stream.
Example:
table = pw.demo.range_stream(nb_rows=50, offset=10, input_rate=2.5)
In the above example, an artificial data stream is generated with a single column ‘value’ and 50 rows. The ‘value’ column contains values ranging from ‘offset’ (10 in this case) to ‘nb_rows’ + ‘offset’ (60).
replay_csv(path, *, schema, input_rate=1.0)
sourceReplay a static CSV files as a data stream.
- Parameters
- path (
str
|PathLike
) – Path to the file to stream. - schema (
type
[Schema
]) – Schema of the resulting table. - autocommit_duration_ms – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
- input_rate (
float, optional
) – The rate at which rows are read per second. Defaults to 1.0.
- path (
- Returns
Table – The table read.
Note: the CSV files should follow a standard CSV settings. The separator is ‘,’, the quotechar is ‘”’, and there is no escape.
replay_csv_with_time(path, *, schema, time_column, unit='s', autocommit_ms=100, speedup=1)
sourceReplay a static CSV files as a data stream while respecting the time between updated based on a timestamp columns. The timestamps in the file should be ordered positive integers.
- Parameters
- path (
str
) – Path to the file to stream. - schema (
type
[Schema
]) – Schema of the resulting table. - time_column (
str
) – Column containing the timestamps. - unit (
str
) – Unit of the timestamps. Only ‘s’, ‘ms’, ‘us’, and ‘ns’ are supported. Defaults to ‘s’. - autocommit_duration_ms – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
- speedup (
float
) – Produce stream speedup times faster than it would result from the time column.
- path (
- Returns
Table – The table read.
Note: the CSV files should follow a standard CSV settings. The separator is ‘,’, the quotechar is ‘”’, and there is no escape.