pw.io.rabbitmq
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
read(uri, stream_name, *, schema=None, format='raw', mode='streaming', autocommit_duration_ms=1500, json_field_paths=None, with_metadata=False, start_from='beginning', start_from_timestamp_ms=None, name=None, max_backlog_size=None, tls_settings=None, debug_data=None, **kwargs)
sourceReads data from a RabbitMQ stream.
This connector supports plain RabbitMQ Streams. Super Streams (partitioned streams) are not supported in the current version.
There are three formats supported: "plaintext", "raw", and "json".
For the "raw" format, the payload is read as raw bytes and added directly to the
table. In the "plaintext" format, the payload is decoded from UTF-8 and stored as
plain text. In both cases, the table will have a "data" column representing
the payload.
If "json" is chosen, the connector parses the message payload as JSON
and creates table columns based on the schema provided in the schema parameter.
Application properties (headers). When with_metadata=True, the
_metadata column includes application_properties — a dict of all
AMQP 1.0 application properties set by the writer. This is consistent with
how pw.io.kafka.read() exposes Kafka headers in _metadata.
Persistence. When persistence is enabled, the connector saves the current stream offset (a single integer) to the snapshot. On restart, it resumes from the saved offset, so already-processed messages are not re-read.
- Parameters
- uri (
str) – The URI of the RabbitMQ server with Streams enabled, e.g."rabbitmq-stream://guest:guest@localhost:5552". - stream_name (
str) – The name of the RabbitMQ stream to read data from. The stream must already exist on the server. - schema (
type[Schema] |None) – The table schema, used only when the format is set to"json". - format (
Literal['plaintext','raw','json']) – The input data format, which can be"raw","plaintext", or"json". - mode (
Literal['streaming','static']) – The reading mode, which can be"streaming"or"static". In"streaming"mode, the connector reads messages continuously. In"static"mode, it reads all existing messages and then stops. - autocommit_duration_ms (
int|None) – The time interval (in milliseconds) between commits. After this time, the updates received by the connector are committed and added to Pathway’s computation graph. - json_field_paths (
dict[str,str] |None) – For the"json"format, this allows mapping field names to paths within the JSON structure. Use the format<field_name>: <path>where the path follows the JSON Pointer (RFC 6901). - with_metadata (
bool) – IfTrue, adds a_metadatacolumn containing a JSON dict withoffset,stream_name, AMQP 1.0 message properties when available (message_id,correlation_id,content_type,content_encoding,subject,reply_to,priority,durable), andapplication_properties— a dict of string key-value pairs containing the AMQP application properties set by the writer. Values produced bywrite()are JSON-encoded strings (seeheadersparameter ofwrite()), so they can be parsed back withjson.loads. - start_from (
Literal['beginning','end','timestamp']) – Where to start reading from."beginning"starts from the first message in the stream."end"skips all existing messages and only reads new ones arriving after the reader starts."timestamp"starts from messages at or after the time given instart_from_timestamp_ms. - start_from_timestamp_ms (
int|None) – Timestamp in milliseconds since epoch. Required whenstart_from="timestamp", must not be set otherwise. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. - tls_settings (
TLSSettings|None) – TLS connection settings. UseTLSSettingsto configure root certificates, client certificates, and verification mode. - debug_data – Static data replacing original one when debug mode is active.
- uri (
- Returns
Table – The table read.
Example:
Read messages in raw format (the default). The table will have key and data
columns:
import pathway as pw
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
)
Read messages as UTF-8 plaintext:
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
format="plaintext",
)
Read and parse JSON messages with a schema:
class InputSchema(pw.Schema):
owner: str
pet: str
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
format="json",
schema=InputSchema,
)
Extract nested JSON fields using JSON Pointer paths:
class InputSchema(pw.Schema):
name: str
age: int
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
format="json",
schema=InputSchema,
json_field_paths={"name": "/user/name", "age": "/user/age"},
)
Read in static mode (bounded snapshot):
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
mode="static",
)
Read only new messages, ignoring all existing data in the stream:
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
start_from="end",
)
Read with persistence enabled, so progress is saved across restarts:
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
format="json",
schema=InputSchema,
name="my-rabbitmq-source",
)
# Then run with persistence:
# pw.run(persistence_config=pw.persistence.Config(
# pw.persistence.Backend.filesystem("./PStorage")
# ))
Read with metadata column:
table = pw.io.rabbitmq.read(
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
with_metadata=True,
)
write(table, uri, stream_name, *, format='json', value=None, headers=None, name=None, sort_by=None, tls_settings=None)
sourceWrites data into the specified RabbitMQ stream.
The produced messages consist of the payload, corresponding to the values of the
table that are serialized according to the chosen format. Two AMQP 1.0 application
properties are always added: pathway_time (processing time) and pathway_diff
(either 1 or -1). If headers parameter is used, additional properties
can be added to the message.
There are several serialization formats supported: "json",
"plaintext" and "raw".
If the selected format is either "plaintext" or "raw", you also need to
specify which column of the table corresponds to the payload of the produced message.
It can be done by providing the value parameter.
- Parameters
- table (
Table) – The table for output. - uri (
str) – The URI of the RabbitMQ server with Streams enabled, e.g."rabbitmq-stream://guest:guest@localhost:5552". - stream_name (
str|ColumnReference) – The RabbitMQ stream where data will be written. The stream must already exist on the server. Can be a column reference for dynamic routing — each row will be written to the stream named by that column’s value. All target streams must be pre-created. - format (
Literal['json','plaintext','raw']) – Format in which the data is put into RabbitMQ. Currently"json","plaintext"and"raw"are supported. - value (
ColumnReference|None) – Reference to the column that should be used as a payload in the produced message in"plaintext"or"raw"format. - headers (
Optional[Iterable[ColumnReference]]) – References to the table fields that must be provided as AMQP 1.0 application properties (analogous to Kafka headers). Values are serialized as AMQP strings using their JSON representation, following the same encoding aspw.io.jsonlines.write()(e.g.42for an int,""hello""for a string,nullfor None, base64-encoded string for bytes). RabbitMQ Streams does not reliably confirm messages with non-string application property values, so all types are JSON-encoded. On the reader side, header values are available in_metadata.application_properties(whenwith_metadata=True). - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. - tls_settings (
TLSSettings|None) – TLS connection settings. UseTLSSettingsto configure root certificates, client certificates, and verification mode.
- table (
Examples:
Consider a RabbitMQ server with Streams enabled running locally on port 5552.
First, create a Pathway table:
import pathway as pw
table = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | cat
8 | Alice | cat
''')
Write the table in JSON format. Each row is serialized as a JSON object:
pw.io.rabbitmq.write(
table,
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
format="json",
)
Use the "plaintext" format to send a single column as the message payload.
When the table has more than one column, you must specify which column to use
via the value parameter. Additional columns can be forwarded as AMQP
application properties using the headers parameter:
pw.io.rabbitmq.write(
table,
"rabbitmq-stream://guest:guest@localhost:5552",
"events",
format="plaintext",
value=table.owner,
headers=[table.age, table.pet],
)
Write each row to a different stream based on a column value (dynamic topics). All target streams must already exist on the server:
table_with_targets = pw.debug.table_from_markdown('''
value | target_stream
hello | stream-a
world | stream-b
''')
pw.io.rabbitmq.write(
table_with_targets,
"rabbitmq-stream://guest:guest@localhost:5552",
table_with_targets.target_stream,
format="json",
)