pw.io.rabbitmq

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

read(uri, stream_name, *, schema=None, format='raw', mode='streaming', autocommit_duration_ms=1500, json_field_paths=None, with_metadata=False, start_from='beginning', start_from_timestamp_ms=None, name=None, max_backlog_size=None, tls_settings=None, debug_data=None, **kwargs)

sourceReads data from a RabbitMQ stream.

This connector supports plain RabbitMQ Streams. Super Streams (partitioned streams) are not supported in the current version.

There are three formats supported: "plaintext", "raw", and "json".

For the "raw" format, the payload is read as raw bytes and added directly to the table. In the "plaintext" format, the payload is decoded from UTF-8 and stored as plain text. In both cases, the table will have a "data" column representing the payload.

If "json" is chosen, the connector parses the message payload as JSON and creates table columns based on the schema provided in the schema parameter.

Application properties (headers). When with_metadata=True, the _metadata column includes application_properties — a dict of all AMQP 1.0 application properties set by the writer. This is consistent with how pw.io.kafka.read() exposes Kafka headers in _metadata.

Persistence. When persistence is enabled, the connector saves the current stream offset (a single integer) to the snapshot. On restart, it resumes from the saved offset, so already-processed messages are not re-read.

  • Parameters
    • uri (str) – The URI of the RabbitMQ server with Streams enabled, e.g. "rabbitmq-stream://guest:guest@localhost:5552".
    • stream_name (str) – The name of the RabbitMQ stream to read data from. The stream must already exist on the server.
    • schema (type[Schema] | None) – The table schema, used only when the format is set to "json".
    • format (Literal['plaintext', 'raw', 'json']) – The input data format, which can be "raw", "plaintext", or "json".
    • mode (Literal['streaming', 'static']) – The reading mode, which can be "streaming" or "static". In "streaming" mode, the connector reads messages continuously. In "static" mode, it reads all existing messages and then stops.
    • autocommit_duration_ms (int | None) – The time interval (in milliseconds) between commits. After this time, the updates received by the connector are committed and added to Pathway’s computation graph.
    • json_field_paths (dict[str, str] | None) – For the "json" format, this allows mapping field names to paths within the JSON structure. Use the format <field_name>: <path> where the path follows the JSON Pointer (RFC 6901).
    • with_metadata (bool) – If True, adds a _metadata column containing a JSON dict with offset, stream_name, AMQP 1.0 message properties when available (message_id, correlation_id, content_type, content_encoding, subject, reply_to, priority, durable), and application_properties — a dict of string key-value pairs containing the AMQP application properties set by the writer. Values produced by write() are JSON-encoded strings (see headers parameter of write()), so they can be parsed back with json.loads.
    • start_from (Literal['beginning', 'end', 'timestamp']) – Where to start reading from. "beginning" starts from the first message in the stream. "end" skips all existing messages and only reads new ones arriving after the reader starts. "timestamp" starts from messages at or after the time given in start_from_timestamp_ms.
    • start_from_timestamp_ms (int | None) – Timestamp in milliseconds since epoch. Required when start_from="timestamp", must not be set otherwise.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
    • max_backlog_size (int | None) – Limit on the number of entries read from the input source and kept in processing at any moment.
    • tls_settings (TLSSettings | None) – TLS connection settings. Use TLSSettings to configure root certificates, client certificates, and verification mode.
    • debug_data – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Read messages in raw format (the default). The table will have key and data columns:

import pathway as pw
table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
)

Read messages as UTF-8 plaintext:

table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    format="plaintext",
)

Read and parse JSON messages with a schema:

class InputSchema(pw.Schema):
    owner: str
    pet: str
table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    format="json",
    schema=InputSchema,
)

Extract nested JSON fields using JSON Pointer paths:

class InputSchema(pw.Schema):
    name: str
    age: int
table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    format="json",
    schema=InputSchema,
    json_field_paths={"name": "/user/name", "age": "/user/age"},
)

Read in static mode (bounded snapshot):

table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    mode="static",
)

Read only new messages, ignoring all existing data in the stream:

table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    start_from="end",
)

Read with persistence enabled, so progress is saved across restarts:

table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    format="json",
    schema=InputSchema,
    name="my-rabbitmq-source",
)
# Then run with persistence:
# pw.run(persistence_config=pw.persistence.Config(
#     pw.persistence.Backend.filesystem("./PStorage")
# ))

Read with metadata column:

table = pw.io.rabbitmq.read(
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    with_metadata=True,
)

write(table, uri, stream_name, *, format='json', value=None, headers=None, name=None, sort_by=None, tls_settings=None)

sourceWrites data into the specified RabbitMQ stream.

The produced messages consist of the payload, corresponding to the values of the table that are serialized according to the chosen format. Two AMQP 1.0 application properties are always added: pathway_time (processing time) and pathway_diff (either 1 or -1). If headers parameter is used, additional properties can be added to the message.

There are several serialization formats supported: "json", "plaintext" and "raw".

If the selected format is either "plaintext" or "raw", you also need to specify which column of the table corresponds to the payload of the produced message. It can be done by providing the value parameter.

  • Parameters
    • table (Table) – The table for output.
    • uri (str) – The URI of the RabbitMQ server with Streams enabled, e.g. "rabbitmq-stream://guest:guest@localhost:5552".
    • stream_name (str | ColumnReference) – The RabbitMQ stream where data will be written. The stream must already exist on the server. Can be a column reference for dynamic routing — each row will be written to the stream named by that column’s value. All target streams must be pre-created.
    • format (Literal['json', 'plaintext', 'raw']) – Format in which the data is put into RabbitMQ. Currently "json", "plaintext" and "raw" are supported.
    • value (ColumnReference | None) – Reference to the column that should be used as a payload in the produced message in "plaintext" or "raw" format.
    • headers (Optional[Iterable[ColumnReference]]) – References to the table fields that must be provided as AMQP 1.0 application properties (analogous to Kafka headers). Values are serialized as AMQP strings using their JSON representation, following the same encoding as pw.io.jsonlines.write() (e.g. 42 for an int, ""hello"" for a string, null for None, base64-encoded string for bytes). RabbitMQ Streams does not reliably confirm messages with non-string application property values, so all types are JSON-encoded. On the reader side, header values are available in _metadata.application_properties (when with_metadata=True).
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch.
    • tls_settings (TLSSettings | None) – TLS connection settings. Use TLSSettings to configure root certificates, client certificates, and verification mode.

Examples:

Consider a RabbitMQ server with Streams enabled running locally on port 5552. First, create a Pathway table:

import pathway as pw
table = pw.debug.table_from_markdown('''
age | owner | pet
10  | Alice | dog
9   | Bob   | cat
8   | Alice | cat
''')

Write the table in JSON format. Each row is serialized as a JSON object:

pw.io.rabbitmq.write(
    table,
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    format="json",
)

Use the "plaintext" format to send a single column as the message payload. When the table has more than one column, you must specify which column to use via the value parameter. Additional columns can be forwarded as AMQP application properties using the headers parameter:

pw.io.rabbitmq.write(
    table,
    "rabbitmq-stream://guest:guest@localhost:5552",
    "events",
    format="plaintext",
    value=table.owner,
    headers=[table.age, table.pet],
)

Write each row to a different stream based on a column value (dynamic topics). All target streams must already exist on the server:

table_with_targets = pw.debug.table_from_markdown('''
value | target_stream
hello | stream-a
world | stream-b
''')
pw.io.rabbitmq.write(
    table_with_targets,
    "rabbitmq-stream://guest:guest@localhost:5552",
    table_with_targets.target_stream,
    format="json",
)