pw.io.mqtt
read(uri, topic, *, qos=2, schema=None, format='raw', autocommit_duration_ms=1500, json_field_paths=None, name=None, debug_data=None, **kwargs)
sourceReads data from a specified MQTT topic.
It supports three formats: "plaintext"
, "raw"
, and "json"
.
For the "raw"
format, the payload is read as raw bytes and added directly to the
table. In the "plaintext"
format, the payload decoded from UTF-8 and stored as plain text.
In both cases, the table will have an autogenerated primary key and a single "data"
column representing the payload.
If you select the "json"
format, the connector parses the message payload as JSON
and creates table columns based on the schema provided in the schema
parameter. The
column values come from the corresponding JSON fields.
- Parameters
- uri (
str
) – The connection string for the MQTT broker. - topic (
str
) – The name of the MQTT topic to read data from. - qos (
int
) – The QoS (Quality of Service) value value for the connection. Note that the final QoS is determined by the broker as the lower of the writer’s and reader’s QoS levels. - schema (
type
[Schema
] |None
) – The table schema, used only when the format is set to"json"
. - format (
str
) – The input data format, which can be"raw"
,"plaintext"
, or"json"
. - autocommit_duration_ms (
int
|None
) – The time interval (in milliseconds) between commits. After this time, the updates received by the connector are committed and added to Pathway’s computation graph. - json_field_paths (
dict
[str
,str
] |None
) – For the"json"
format, this allows mapping field names to paths within the JSON structure. Use the format<field_name>: <path>
where the path follows the JSON Pointer (RFC 6901). - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress. - debug_data – Static data replacing original one when debug mode is active.
- uri (
- Returns
Table – The table read.
Example:
To run local tests, you can either install an MQTT broker like Mosquitto
on your machine or use a Docker image
with the communication port exposed. By default, port 1883
is commonly used.
If your MQTT broker is running on localhost
using the default port, you can
stream the "test/data"
topic to a Pathway table like this:
import pathway as pw
table = pw.io.mqtt.read("mqtt://localhost:1883/?client_id=test", "test/data")
Keep in mind that MQTT does not guarantee message storage. In other words, you cannot assume that a message present in the queue will remain there. MQTT also lacks any concept of message offsets within a topic. As a result, when Pathway persistence is enabled, it saves the message stream without making assumptions about the topic’s state at the time of a restart. Therefore, we recommend designing your data flow to tolerate at-least-once or at-most-once delivery semantics depending on the configuration.
You can also parse messages as UTF-8 during reading by using the "format"
parameter.
Here’s how the reading process would look:
table = pw.io.mqtt.read(
"mqtt://localhost:1883/?client_id=test",
"test/data",
format="plaintext"
)
Alternatively, you can read and parse a JSON table during the reading process by
using the "json"
format and the schema
parameter.
For example, if your data is in JSON format with three fields - an integer user_id
(which you’d like to use as the primary key instead of an autogenerated one), and
two string fields username
and phone
- you can define the schema like this:
class InputSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
username: str
phone: str
Now, you can use the format
and schema
parameters of the connector like this:
table = pw.io.mqtt.read(
"mqtt://localhost:1883/?client_id=test",
"data",
format="json",
schema=InputSchema,
)
As a result, you will have a table with three columns: "user_id"
, "username"
, and
"phone"
. The "user_id"
column will also act as the primary key for the Pathway table.
write(table, uri, topic, *, qos=2, retain=False, format='json', delimiter=',', value=None, name=None, sort_by=None)
sourceWrites data into the specified MQTT topic.
There are several serialization formats supported: "json"
, "dsv"
, "plaintext"
and "raw"
. The format defines how the message is formed. In case of JSON and DSV
(delimiter separated values), the message is formed in accordance with the respective data format.
The produced messages consist of the payload, corresponding to the values of the table
that are serialized according to the chosen format. Please note that the time
and
diff
values aren’t reported if "plaintext"
or "binary"
formats are used.
If the selected format is either "plaintext"
or "raw"
, you also need to specify,
which column of the table correspond to the payload of the produced MQTT message. It can be
done by providing value
parameter. It can also be deduced automatically if the table
consists of a single column.
Please note that MQTT v5-specific features, such as user-defined message headers, are not yet supported but will be added soon. In the meantime, the connector is compatible with both older MQTT versions and v5.
- Parameters
- table (
Table
) – The table for output. - uri (
str
) – The URI of the MQTT broker. - topic (
str
|ColumnReference
) – The MQTT topic where data will be written. This can be a specific topic name or a reference to a column whose values will be used as the topic for each message. If using a column reference, the column must contain string values. - qos (
int
) – The QoS (Quality of Service) value value for the connection. Note that the final QoS is determined by the broker as the lower of the writer’s and reader’s QoS levels. - retain (
bool
) – If set toTrue
, the MQTT broker will retain the last message published to the topic. - format (
str
) – format in which the data is put into MQTT. Currently"json"
,"plaintext"
,"raw"
and"dsv"
are supported. If the"raw"
format is selected,table
must either contain exactly one binary column that will be dumped as it is into the message, or the reference to the target binary column must be specified explicitly in thevalue
parameter. Similarly, if"plaintext"
is chosen, the table should consist of a single column of the string type. - delimiter (
str
) – field delimiter to be used in case of delimiter-separated values format. - value (
ColumnReference
|None
) – reference to the column that should be used as a payload in the produced message in"plaintext"
or"raw"
format. It can be deduced automatically if the table has exactly one column. Otherwise it must be specified directly. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
Example:
Assume you have the MQTT server running locally on the default port, 1883
. Let’s
explore a few ways to send the contents of a table to the topic test/topic
on this server.
First, you’ll need to create a Pathway table. You can do this using the table_from_markdown
method to set up a test table with information about pets and their owners.
import pathway as pw
table = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | cat
8 | Alice | cat
''')
To output the table’s contents in JSON format, use the connector like this:
pw.io.mqtt.write(
table,
"mqtt://localhost:1883/?client_id=test",
topic="test/topic",
format="json",
)
In this case, the output will include the table’s rows in JSON format, with time
and diff
fields added to each JSON payload.
You can also use a single column from the table as the payload. For instance, to use
the owner
column as the MQTT message payload, implement it as follows:
pw.io.mqtt.write(
table,
"mqtt://localhost:1883/?client_id=test",
topic="test/topic",
format="plaintext",
value=table.owner,
)
Finally, if you’d like the topic to be dynamic and depend on the the owner of the pet, you can specify this column definition as the topic:
pw.io.mqtt.write(
table,
"mqtt://localhost:1883/?client_id=test",
topic=table.owner,
format="json",
)