pw.io.nats
pw.io.nats.read(uri, topic, *, schema=None, format='raw', autocommit_duration_ms=1500, json_field_paths=None, parallel_readers=None, persistent_id=None, debug_data=None)
sourceReads data from a specified NATS topic.
It supports three formats: "plaintext"
, "raw"
, and "json"
.
- For the
"raw"
format, the payload is read as raw bytes and added directly to the
table.
* In the "plaintext"
format, the payload decoded from UTF-8 and stored as plain text.
In both cases, the table will have an autogenerated primary key and a single "data"
column representing the payload.
If you select the "json"
format, the connector parses the message payload as JSON
and creates table columns based on the schema provided in the schema
parameter. The
column values come from the corresponding JSON fields.
- Parameters
- uri (
str
) – The URI of the NATS server. - topic (
str
) – The name of the NATS topic to read data from. - schema (
type
[Schema
] |None
) – The table schema, used only when the format is set to"json"
. - format (
str
) – The input data format, which can be"raw"
,"plaintext"
, or"json"
. - autocommit_duration_ms (
int
|None
) – The time interval (in milliseconds) between commits. After this time, the updates received by the connector are committed and added to Pathway’s computation graph. - json_field_paths (
dict
[str
,str
] |None
) – For the"json"
format, this allows mapping field names to paths within the JSON structure. Use the format<field_name>: <path>
where the path follows the JSON Pointer (RFC 6901). - parallel_readers (
int
|None
) – The number of reader instances running in parallel. If not specified, it defaults tomin(pathway_threads, total_partitions)
. It can’t exceed the number of Pathway engine threads and will be reduced if necessary. - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time. - debug_data – Static data replacing original one when debug mode is active.
- uri (
- Returns
Table – The table read.
Example:
To run local tests, you can download the nats-server
binary from the
Releases page and start it. By
default, it runs on port 4222
at localhost
.
If your NATS server is running on localhost
using the default port, you can
stream the "data"
topic to a Pathway table like this:
import pathway as pw
table = pw.io.nats.read("nats://127.0.0.1:4222", "data")
Keep in mind that NATS doesn’t normally store messages. So, make sure to start your Pathway program before sending any messages.
You can also parse messages as UTF-8 during reading by using the "format"
parameter.
Here’s how the reading process would look:
table = pw.io.nats.read("nats://127.0.0.1:4222", "data", format="plaintext")
Alternatively, you can read and parse a JSON table during the reading process by
using the "json"
format and the schema
parameter.
For example, if your data is in JSON format with three fields - an integer id
(which you’d like to use as the primary key instead of an autogenerated one), and
two string fields username
and phone
- you can define the schema like this:
class InputSchema(pw.Schema):
id: int = pw.column_definition(primary_key=True)
username: str
phone: str
Now, you can use the format
and schema
parameters of the connector like this:
table = pw.io.nats.read(
"nats://127.0.0.1:4222",
"data",
format="json",
schema=InputSchema,
)
As a result, you will have a table with three columns: "id"
, "username"
, and
"phone"
. The "id"
column will also act as the primary key for the Pathway table.
pw.io.nats.write(table, uri, topic, *, format='json', delimiter=',', value=None, headers=None)
sourceWrites data into the specified NATS topic.
The produced messages consist of the payload, corresponding to the values of the table
that are serialized according to the chosen format and two headers: pathway_time
,
corresponding to the processing time of the entry and pathway_diff
that is either 1 or -1.
Both header values are provided as UTF-8 encoded strings. If headers
parameter is used,
additional headers can be added to the message.
There are several serialization formats supported: 'json'
, 'dsv'
, 'plaintext'
and 'raw'
. The format defines how the message is formed. In case of JSON and DSV
(delimiter separated values), the message is formed in accordance with the respective data format.
If the selected format is either ‘plaintext’ or ‘raw’, you also need to specify, which
column of the table correspond to the payload of the produced NATS message. It can be
done by providing key
and value
parameters. In order to output extra values
from the table in these formats, NATS headers can be used. You can specify the column
references in the headers
parameter, which leads to serializing the extracted fields
into UTF-8 strings and passing them as additional message headers.
- Parameters
- table (
Table
) – The table for output. - uri (
str
) – The URI of the NATS server. - topic (
str
) – The name of the NATS topic to write data to. - format (
str
) – format in which the data is put into NATS. Currently “json”, “plaintext”, “raw” and “dsv” are supported. If the “raw” format is selected,table
must either contain exactly one binary column that will be dumped as it is into the message, or the reference to the target binary column must be specified explicitly in thevalue
parameter. Similarly, if “plaintext” is chosen, the table should consist of a single column of the string type. - delimiter (
str
) – field delimiter to be used in case of delimiter-separated values format. - value (
ColumnReference
|None
) – reference to the column that should be used as a payload in the produced message in ‘plaintext’ or ‘raw’ format. It can be deduced automatically if the table has exactly one column. Otherwise it must be specified directly. It also has to be explicitly specified, ifkey
is set. - headers (
Optional
[Iterable
[ColumnReference
]]) – references to the table fields that must be provided as message headers. These headers are named in the same way as fields that are forwarded and correspond to the string representations of the respective values encoded in UTF-8. Note that due to NATS constraints imposed on headers, the binary fields must also be UTF-8 serializable.
- table (
Example:
Assume you have the NATS server running locally on the default port, 4222
. Let’s
explore a few ways to send the contents of a table to the topic test_topic
on this server.
First, you’ll need to create a Pathway table. You can do this using the table_from_markdown
method to set up a test table with information about pets and their owners.
import pathway as pw
table = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | cat
8 | Alice | cat
''')
To output the table’s contents in JSON format, use the connector like this:
pw.io.nats.write(
table,
"nats://127.0.0.1:4222",
"test_topic",
format="json",
)
In this case, the output will include the table’s rows in JSON format, with time
and diff
fields added to each JSON payload.
You can also use a single column from the table as the payload. For instance, to use
the owner
column as the NATS message payload, implement it as follows:
pw.io.nats.write(
table,
"nats://127.0.0.1:4222",
"test_topic",
format="plaintext",
value=table.owner,
)
If needed, you can also send the remaining fields as headers. To do this, modify the
code to use the headers
field, which should include all the required fields.
Since owner
is already being sent as the message payload, you can add the
age
and pet
columns to the headers. Here’s what the code would look like:
pw.io.nats.write(
table,
"nats://127.0.0.1:4222",
"test_topic",
format="plaintext",
value=table.owner,
headers=[table.age, table.pet],
)