pw.io.kinesis
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
read(stream_name, *, schema=None, format='raw', autocommit_duration_ms=1500, json_field_paths=None, name=None, max_backlog_size=None, debug_data=None, **kwargs)
sourceReads a table from an AWS Kinesis stream. The connection settings are retrieved from the environment.
There are three supported formats: "plaintext"
, "raw"
, and "json"
.
For the "raw"
format, the payload is read as raw bytes and added directly to the
table. In the "plaintext"
format, the payload decoded from UTF-8 and stored as plain text.
In both cases, the table will have an autogenerated primary key and a single "data"
column representing the payload.
If you select the "json"
format, the connector parses the message payload as JSON
and creates table columns based on the schema provided in the schema
parameter. The
column values come from the corresponding JSON fields.
- Parameters
- stream_name (
str
) – The name of the data stream to be read. - schema (
type
[Schema
] |None
) – The table schema, used only when the format is set to"json"
. - format (
Literal
['plaintext'
,'raw'
,'json'
]) – The input data format, which can be"raw"
,"plaintext"
, or"json"
. - autocommit_duration_ms (
int
) – The time interval (in milliseconds) between commits. After this time, the updates received by the connector are committed and added to Pathway’s computation graph. Please note that it has to be a not-None value in this connector. - json_field_paths (
dict
[str
,str
] |None
) – For the"json"
format, this allows mapping field names to paths within the JSON structure. Use the format<field_name>: <path>
where the path follows the JSON Pointer (RFC 6901). - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress. - max_backlog_size (
int
|None
) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. Useful with large sources that emit an initial burst of data to avoid memory spikes. - debug_data – Static data replacing original one when debug mode is active.
- stream_name (
- Returns
Table – The table read.
Example:
To test the connector locally, you need a way to run Kinesis on your machine. You can use the Docker image instructure/kinesalite to spawn it.
Start the container as follows:
docker pull instructure/kinesalite:latest
docker run -p 4567:4567 --name kinesis-local instructure/kinesalite:latest
The first command pulls the Kinesis image from Docker Hub.
The second command starts a container and exposes port 4567
, the standard port
used for the connection.
Since Kinesis now runs locally and the settings are retrieved from the environment, configure the required variables so the connector can reach the local instance:
export AWS_ENDPOINT_URL=http://localhost:4567
export AWS_REGION=us-east-1
Now you can start testing. First, connect to the local Kinesis instance and create a client with boto3:
import boto3
client = boto3.client(
"kinesis",
region_name="us-east-1",
endpoint_url="http://localhost:4567",
)
Use the created client to create a new stream, for example "testing"
:
client.create_stream(StreamName="testing", ShardCount=1)
The stream is created asynchronously, so you need to wait until its status becomes
"ACTIVE"
. To check the status you can use the describe_stream
method.
Once the stream is active, send a few records with put_record
. Note that the
payload must be bytes:
client.put_record(
StreamName="testing",
PartitionKey="123",
Data="Hello, world!".encode("utf-8"),
)
Finally, you have a stream with data. You can now read it using the Pathway connector:
import pathway as pw
table = pw.io.kinesis.read("testing", format="plaintext")
Here you first import Pathway, then read the Kinesis stream.
The "plaintext"
format decodes UTF-8 so the text "Hello, world!"
can be
viewed as plain text.
Finally, write the row to a file using a Pathway output connector, for example JSONLines:
pw.io.jsonlines.write(table, "output.jsonl")
Do not forget to call pw.run()
to start the pipeline.
Once running, the connector continuously monitors the Kinesis stream and writes both
existing and newly arriving messages to output.jsonl
.
write(table, stream_name, *, format='json', partition_key=None, data=None, name=None, sort_by=None)
sourceStreams table
into an
AWS Kinesis stream.
The connection settings are retrieved from the environment.
- Parameters
- table (
Table
) – The table to write. - stream_name (
str
|ColumnReference
) – The Kinesis stream where data will be written. This can be a specific stream name or a reference to a column whose values will be used as the stream for each message. If using a column reference, the column must contain string values. - format (
Literal
['raw'
,'plaintext'
,'json'
]) – Format in which the data is put into Kinesis. Currently"json"
,"plaintext"
, and"raw"
are supported. If the"raw"
format is selected,table
must either contain exactly one binary column that will be dumped as it is into the Kinesis record, or the reference to the target binary column must be specified explicitly in thedata
parameter. Similarly, if"plaintext"
is chosen, the table must consist of a single column of the string type, or the reference to the target string column must be specified explicitly in thedata
parameter. - partition_key (
ColumnReference
|None
) – Reference to the column used as the partition key in the produced message. It can have any data type, because if it is not a string, Pathway will obtain its string representation and use that value. Note that the maximum length of a partition key in Kinesis is 256 bytes. If the key is not specified, internal row key will be used. - data (
ColumnReference
|None
) – Reference to the column that should be used as data in the produced message in"plaintext"
or"raw"
format. It can be deduced automatically if the table has exactly one column. Otherwise it must be specified directly. It also has to be explicitly specified, ifpartition_key
is set. The type of the column must correspond to the format used:str
for the"plaintext"
format andbinary
for the"raw"
format. Note that the maximum length of one message payload in Kinesis is 1 MiB. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
The local test setup is the same as for the case of the input connector: you need a way to run Kinesis on your machine. You can use the Docker image instructure/kinesalite to spawn it.
Start the container as follows:
docker pull instructure/kinesalite:latest
docker run -p 4567:4567 --name kinesis-local instructure/kinesalite:latest
The first command pulls the Kinesis image from Docker Hub.
The second command starts a container and exposes port 4567
, the standard port
used for the connection.
Since Kinesis now runs locally and the settings are retrieved from the environment, configure the required variables so the connector can reach the local instance:
export AWS_ENDPOINT_URL=http://localhost:4567
export AWS_REGION=us-east-1
Now you can start testing. First, connect to the local Kinesis instance and create a client with boto3:
import boto3
client = boto3.client(
"kinesis",
region_name="us-east-1",
endpoint_url="http://localhost:4567",
)
Use the created client to create a new stream, for example "testing"
.
For easier testing, create the stream with a single shard as follows:
client.create_stream(StreamName="testing", ShardCount=1)
The stream is now ready and you can send messages to it with this connector. First, import Pathway and create a static table with sample data. The code would look like this:
import pathway as pw
table = pw.debug.table_from_markdown(
'''
| key | value
1 | 1 | one
2 | 2 | two
3 | 3 | three
'''
)
The table is created; the next step is to write it to the Kinesis stream. You can do so as follows:
pw.io.kinesis.write(
table,
stream_name="testing",
format="plaintext",
partition_key=table.key,
data=table.value,
)
This command streams the table as follows: the column key
is used as the partition key.
Because it is an integer, the connector converts it to a string. The column value
is sent as the payload. Remember to call pw.run
to start the pipeline.
After the execution finishes, you can read the stream with the boto3 library. Start by listing all shards:
shards_response = client.list_shards(StreamName="testing")
Because the stream was created with one shard, the "Shards"
list in the response
contains only one item. You can retrieve its ID as shown below:
shard_id = shards_response["Shards"][0]["ShardId"]
To fetch messages, first obtain an iterator for this shard as follows:
iterator_response = client.get_shard_iterator(
StreamName="testing",
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
shard_iterator = iterator_response["ShardIterator"]
The parameter TRIM_HORIZON
instructs Kinesis to read the shard from the beginning.
If you rerun this example, the stream may contain more messages than expected because
the connector does not clear streams. Also note that a shard iterator is valid for
five minutes, so it must be refreshed if needed.
You can now request the records. Although the API is paginated, three test records can be retrieved in a single call as follows:
records_response = client.get_records(ShardIterator=shard_iterator, Limit=100)
To verify the contents, you can print the received messages with this code:
for r in records_response["Records"]:
print(f"Partition key: {r['PartitionKey']}; Value: {r['Data']}")
You could also read these messages using the Pathway Kinesis input connector. Then, the reading code would look like this:
reread_table = pw.io.kinesis.read("testing", format="plaintext")
This table can then be written to a file or processed further.
If the table contains more than two columns and you want to keep all data, use the JSON format for serialization:
pw.io.kinesis.write(
table,
stream_name="testing",
format="json",
)
If you need to split the table output across multiple streams, the column containing
the stream name can be provided as the stream_name
parameter.
For example, create a table with an additional column:
table = pw.debug.table_from_markdown(
'''
| key | value | stream
1 | 1 | one | testing
2 | 2 | two | other
3 | 3 | three | testing
'''
)
And then, write this table to multiple streams depending on the stream
column as follows:
pw.io.kinesis.write(
table,
stream_name=table.stream,
format="json",
)
As a result, two messages with "value"
equal to "one"
and "three"
are
added to the "testing"
stream. One message with "value"
equal to "two"
is
added to the "other"
stream, which must be created beforehand if you are testing
this part locally.