Consuming WebSockets streams
In this tutorial, you will be guided through creating a custom WebSocket connector. It will allow you to interact with WebSocket data streams and process them as needed.
You will learn how to implement a custom Python connector that utilizes the aiohttp
library to consume WebSockets data streams.
The WebSockets protocol's nature means that each case's communication process can be unique. This tutorial will focus on an API that requires a multi-step message exchange to connect to the relevant data streams. By properly modifying this example, you can connect to any WebSocket API.
Generic connector
Let's start by abstracting the logic of the WebSockets connection. For this purpose, you need to implement a class inheriting from pw.io.python.ConnectorSubject
:
import pathway as pw
import asyncio
import aiohttp
from aiohttp.client_ws import ClientWebSocketResponse
class AIOHttpWebsocketSubject(pw.io.python.ConnectorSubject):
_url: str
def __init__(self, url: str):
super().__init__()
self._url = url
def run(self):
async def consume():
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self._url) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.CLOSE:
break
else:
result = await self.on_ws_message(msg, ws)
for row in result:
self.next_json(row)
asyncio.new_event_loop().run_until_complete(consume())
async def on_ws_message(self, msg, ws: ClientWebSocketResponse) -> list[dict]:
...
The run
method contains the base message consumption logic. Please note that the consume
function will be executed within a dedicated asyncio
loop. The message processing logic has been delegated to an abstract method. The result of the processing is then sent to the buffer using the self.next_json() method. This method is provided by the pw.io.python.ConnectorSubject
superclass.
Real-world example
As an example, you will consume the Polygon.io Stocks API with a connector that subscribes to a one-second aggregations of selected stocks.
import json
class PolygonSubject(AIOHttpWebsocketSubject):
_api_key: str
_symbols: str
def __init__(self, url: str, api_key: str, symbols: str):
super().__init__(url)
self._api_key = api_key
self._symbols = symbols
async def on_ws_message(
self, msg: aiohttp.WSMessage, ws: ClientWebSocketResponse
) -> list[dict]:
if msg.type == aiohttp.WSMsgType.TEXT:
result = []
payload = json.loads(msg.data)
for object in payload:
match object:
case {"ev": "status", "status": "connected"}:
# make authorization request if connected successfully
await self._authorize(ws)
case {"ev": "status", "status": "auth_success"}:
# request a stream, once authenticated
await self._subscribe(ws)
case {"ev": "A"}:
# append data object to results list
result.append(object)
case {"ev": "status", "status": "error"}:
raise RuntimeError(object["message"])
case _:
raise RuntimeError(f"Unhandled payload: {object}")
return result
else:
return []
async def _authorize(self, ws: ClientWebSocketResponse):
await ws.send_json({"action": "auth", "params": self._api_key})
async def _subscribe(self, ws: ClientWebSocketResponse):
await ws.send_json({"action": "subscribe", "params": self._symbols})
Each message is a serialized JSON, which contains a list of objects. However, the API requires authentication before we start receiving messages with the data of interest. After a successful connection, indicated by a message with a certain status, we send an authorization message with the API key. Once the authorization is successful, we can send another message to subscribe to the selected data streams. From this point onward, we expect to receive messages containing aggregations of stock market data.
Let's now define a schema that describes the structure of the resulting Pathway Table. It will correspond to the incoming data from the API since the incoming payload is not modified.
class StockAggregates(pw.Schema):
sym: str # stock symbol
o: float # opening price
v: int # tick volume
s: int # starting tick timestamp
e: int # ending tick timestamp
...
You can now utilize the previously defined subject to create an input table using pw.io.python.read
:
URL = "wss://delayed.polygon.io/stocks"
API_KEY = "your-api-key"
subject = PolygonSubject(url=URL, api_key=API_KEY, symbols=".*")
table = pw.io.python.read(subject, schema=StockAggregates)
For this tutorial, you can use pw.io.subscribe
to subscribe to the changes occurring within the table:
import logging
def on_change(
key: pw.Pointer,
row: dict,
time: int,
is_addition: bool,
):
logging.info(f"{time}: {row}")
pw.io.subscribe(table, on_change)
Now, all that's left is to run Pathway pipeline with pw.run
:
pw.run()