Consuming WebSockets streams

In this tutorial, you will be guided through creating a custom WebSocket connector. It will allow you to interact with WebSocket data streams and process them as needed.

You will learn how to implement a custom Python connector that utilizes the aiohttp library to consume WebSockets data streams.

The WebSockets protocol's nature means that each case's communication process can be unique. This tutorial will focus on an API that requires a multi-step message exchange to connect to the relevant data streams. By properly modifying this example, you can connect to any WebSocket API.

Generic connector

Let's start by abstracting the logic of the WebSockets connection. For this purpose, you need to implement a class inheriting from pw.io.python.ConnectorSubject:

import pathway as pw
import asyncio
import aiohttp
from aiohttp.client_ws import ClientWebSocketResponse


class AIOHttpWebsocketSubject(pw.io.python.ConnectorSubject):
    _url: str

    def __init__(self, url: str):
        super().__init__()
        self._url = url

    def run(self):
        async def consume():
            async with aiohttp.ClientSession() as session:
                async with session.ws_connect(self._url) as ws:
                    async for msg in ws:
                        if msg.type == aiohttp.WSMsgType.CLOSE:
                            break
                        else:
                            result = await self.on_ws_message(msg, ws)
                            for row in result:
                                self.next_json(row)

        asyncio.new_event_loop().run_until_complete(consume())

    async def on_ws_message(self, msg, ws: ClientWebSocketResponse) -> list[dict]:
        ...

The run method contains the base message consumption logic. Please note that the consume function will be executed within a dedicated asyncio loop. The message processing logic has been delegated to an abstract method. The result of the processing is then sent to the buffer using the self.next_json() method. This method is provided by the pw.io.python.ConnectorSubject superclass.

Real-world example

As an example, you will consume the Polygon.io Stocks API with a connector that subscribes to a one-second aggregations of selected stocks.

import json


class PolygonSubject(AIOHttpWebsocketSubject):
    _api_key: str
    _symbols: str

    def __init__(self, url: str, api_key: str, symbols: str):
        super().__init__(url)
        self._api_key = api_key
        self._symbols = symbols

    async def on_ws_message(
        self, msg: aiohttp.WSMessage, ws: ClientWebSocketResponse
    ) -> list[dict]:
        if msg.type == aiohttp.WSMsgType.TEXT:
            result = []
            payload = json.loads(msg.data)
            for object in payload:
                match object:
                    case {"ev": "status", "status": "connected"}:
                        # make authorization request if connected successfully
                        await self._authorize(ws)
                    case {"ev": "status", "status": "auth_success"}:
                        # request a stream, once authenticated
                        await self._subscribe(ws)
                    case {"ev": "A"}:
                        # append data object to results list
                        result.append(object)
                    case {"ev": "status", "status": "error"}:
                        raise RuntimeError(object["message"])
                    case _:
                        raise RuntimeError(f"Unhandled payload: {object}")
            return result
        else:
            return []

    async def _authorize(self, ws: ClientWebSocketResponse):
        await ws.send_json({"action": "auth", "params": self._api_key})

    async def _subscribe(self, ws: ClientWebSocketResponse):
        await ws.send_json({"action": "subscribe", "params": self._symbols})

Each message is a serialized JSON, which contains a list of objects. However, the API requires authentication before we start receiving messages with the data of interest. After a successful connection, indicated by a message with a certain status, we send an authorization message with the API key. Once the authorization is successful, we can send another message to subscribe to the selected data streams. From this point onward, we expect to receive messages containing aggregations of stock market data.

Let's now define a schema that describes the structure of the resulting Pathway Table. It will correspond to the incoming data from the API since the incoming payload is not modified.

class StockAggregates(pw.Schema):
    sym: str  # stock symbol
    o: float  # opening price
    v: int  # tick volume
    s: int  # starting tick timestamp
    e: int  # ending tick timestamp
    ...

You can now utilize the previously defined subject to create an input table using pw.io.python.read:

URL = "wss://delayed.polygon.io/stocks"
API_KEY = "your-api-key"

subject = PolygonSubject(url=URL, api_key=API_KEY, symbols=".*")

table = pw.io.python.read(subject, schema=StockAggregates)

For this tutorial, you can use pw.io.subscribe to subscribe to the changes occurring within the table:

import logging

def on_change(
    key: pw.Pointer,
    row: dict,
    time: int,
    is_addition: bool,
):
    logging.info(f"{time}: {row}")


pw.io.subscribe(table, on_change)

Now, all that's left is to run Pathway pipeline with pw.run:

pw.run()