Using NATS connectors

Pathway provides a pw.io.nats module with connectors to read and send messages from a NATS instance.

In Pathway, you can read and send messages to a NATS subject using pw.io.nats.read and pw.io.nats.write. NATS connectors only work in the streaming mode.

Short version

Consider a simple scenario: messages are sent to a NATS subject connector_example, each message containing a single column value in a JSON format, and you want to compute the sum of these values and send the resulting output stream to the same NATS instance on a sum subject. You can do it as follows in Pathway:

realtime_sum.py
import pathway as pw


# Define a schema for the table.
# It sets all the columns and their types.
class InputSchema(pw.Schema):
  value: int


# Create connection string for the local NATS instance.
connection_string = "nats://localhost:4222/"

# Use the connection string below if authorization is needed:
# connection_string = "nats://username:password@localhost:4222/"

# Use Pathway NATS connector to read data.
t = pw.io.nats.read(
    connection_string,
    topic="connector_example",
    schema=InputSchema,
    format="json",
)

# Compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# Use the NATS connector to send the resulting output stream containing the sum.
pw.io.nats.write(t, connection_string, topic="sum", format="json")

# Launch the computation.
pw.run()

Running NATS locally

If you run this tutorial locally, you'll need to set up a NATS instance on your computer. It can be done in two ways.

The first way is to download the nats-server binary from the Releases page and start it. By default, it runs on port 4222 at localhost.

The second way is to run a Docker image provided by NATS developers and expose port 4222 so that it is accessible by other programs on your machine:

  1. First, you need to pull the image from the repository. It can be done with docker pull nats.
  2. Then, you need to run this image in the background and expose the required port: docker run -d --name nats -p 4222:4222 nats. Now you have NATS running in Docker on your machine.

The second option would be useful in case you need to create a system of several containers running together in docker-compose.

⚠️ Note that NATS was built to achieve high performance, and due to that, it doesn't store messages in any kind of buffer. Therefore, your program will receive only the messages that were sent after it has started.

Input connector

Data stream: Consider a stream in the form of NATS messages received on given subjects. An update is a set of messages: the update is triggered by a commit. Commits ensure the atomicity of each update and are generated periodically.

Usage: the NATS input connector pw.io.nats.read takes several arguments:

  • uri: the connection string used to connect to NATS; It must follow the NATS connection string format.
  • topic: the subject which is listened to.
  • format: format of messages among plaintext, raw, and json.
  • schema: if the format is json, the table schema must be defined. This schema specifies the column names, their data types, and the primary keys. For the raw format, the payload is read as raw bytes and inserted directly into the table. If the plaintext format is selected, the payload is decoded from UTF-8 and stored as plain text.
class InputSchema(pw.Schema):
  value: int


t = pw.io.nats.read(
    connection_string,
    topic="connector_example",
    schema=InputSchema,
    format="json",
)

The way the input connector behaves depends on the format of the input data.

  • raw or plaintext: for raw or plaintext data, there is only one column data in which all the entry is dumped.
  • json: the data is parsed from JSON. In this case, schema field is required.

Output connector

The output connector pw.io.nats.write sends the updates made to a table t to a given NATS instance and on a single NATS subject. The connector can produce table updates following the JSON format. There are also raw and plaintext formats that are supported for single-column tables or if you explicitly specify the name of the column that needs to be provided.

Usage: The output connector takes the following arguments:

  • table: the Pathway table to send to NATS.
  • uri: the connection string used to connect to NATS; It must follow the NATS connection string format.
  • topic: the subject on which the messages are sent.
  • format: json, raw or plaintext.
pw.io.nats.write(t, connection_string, topic="sum", format="json")

Complete example

Let's go back to our example on how to compute a sum over the values of the columns' value received on a NATS subject connector_example in a JSON format. The final version of the project contains two files: realtime_sum.py which processes the stream using Pathway and generate_stream.py which generates the streams.

Here is realtime_sum.py:

realtime_sum.py
import pathway as pw


# Define a schema for the table.
# It sets all the columns and their types.
class InputSchema(pw.Schema):
  value: int


# Create connection string for the local NATS instance.
connection_string = "nats://localhost:4222/"

# Use the connection string below if authorization is needed:
# connection_string = "nats://username:password@localhost:4222/"

# Use Pathway NATS connector to read data.
t = pw.io.nats.read(
    connection_string,
    topic="connector_example",
    schema=InputSchema,
    format="json",
)

# Compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# Use the NATS connector to send the resulting output stream containing the sum.
pw.io.nats.write(t, connection_string, topic="sum", format="json")

# Launch the computation.
pw.run()

Don't forget the pw.run() otherwise no computation will be done! Once pw.run() is called, the computation will be run forever until it gets killed. If you need some reminders on Pathway operations, don't hesitate to take a look at our First-steps guide.

You can use the nats-py library to send messages to NATS using Python in generate_stream.py script:

generate_stream.py
import asyncio
import json
from nats.aio.client import Client as NATS


async def send_messages():
    nc = NATS()

    await nc.connect("nats://localhost:4222")
    try:
        topic = "connector_example"
        for i in range(10):
            message = {"value": i}
            await nc.publish(topic, json.dumps(message).encode("utf-8"))

    finally:
        await nc.close()


if __name__ == "__main__":
    asyncio.run(send_messages())

Please note that since the NATS server does not store messages in a buffer, the subject listener must connect first to receive messages. Therefore, you need to run realtime_sum.py before sending any messages.