Creating a custom Python connector

In this tutorial, you will learn how to create a Python connector that will allow you to connect to your custom data source and feed data directly into Pathway.

In order to create a custom connector, you need to extend the pw.io.python.ConnectorSubject class provided by Pathway and implement the run method, which is responsible for reading the source and feeding the data into the buffer.

ConnectorSubject serves as a bridge between the data source and the Pathway engine. It comes with a special method next allowing you to push data into the buffer. There is also an extended explanation at the end of this article.

Simple scenario: reading from a static file

You will create a connector that turns a static file into a stream. Suppose you have a file cats.jsonl in which each line contains data in JSON format:

{ "key": 1, "genus": "otocolobus", "epithet": "manul" }
{ "key": 2, "genus": "felis", "epithet": "catus" }
{ "key": 3, "genus": "lynx", "epithet": "lynx" }

In the run method, let's read the file and send the contents to the buffer line by line, using the next method.

import json
import time
import pathway as pw

class FileStreamSubject(pw.io.python.ConnectorSubject):
    def run(self):
        with open("cats.jsonl") as file:
            for line in file:
                data = json.loads(line)
                self.next(**data)
                time.sleep(1)

You can now read from the subject you created and run computations.

You need to call the method pw.io.python.read, passing your subject to it. In addition, you need to specify the schema of the data, indicating the format and fields that will be extracted as columns.

class InputSchema(pw.Schema):
    key: int = pw.column_definition(primary_key=True)
    genus: str
    epithet: str

table = pw.io.python.read(
    FileStreamSubject(),
    schema=InputSchema,
)

pw.io.csv.write(table, "output.csv")

pw.run()

Under the hood, the connector will be started in a dedicated thread and will continue to work as long as the run method is spinning.

Advanced scenario: using external Python libraries

⚠️ Twitter has turned off its free tier streaming API.

In the second example, let's utilize an external library called Tweepy to load a stream of tweets.

Tweepy is a Python library for accessing the Twitter API. You can install it with a simple pip command:

pip install tweepy

As a first step, you need to create a TwitterClient class that extends tweepy.StreamingClient:

import tweepy

class TwitterClient(tweepy.StreamingClient):
    _subject: TwitterSubject

    def __init__(self, subject: TwitterSubject) -> None:
        super().__init__(BEARER_TOKEN)
        self._subject = subject

    def on_response(self, response) -> None:
        self._subject.next(
            key=response.data.id,
            text=response.data.text,
        )

The client holds the subject object, which will be described in a moment. The on_response method is called when a new response from a stream is received. This is the right place to convert the message to the desired format and send it to a subject's buffer. Note that you pass a value for each column as a separate keyword argument in the next method.

As previously, you need to define the subject:

import pathway as pw

class TwitterSubject(pw.io.python.ConnectorSubject):
    _twitter_client: TwitterClient

    def __init__(self) -> None:
        super().__init__()
        self._twitter_client = TwitterClient(self)

    def run(self) -> None:
        self._twitter_client.sample()

    def on_stop(self) -> None:
        self._twitter_client.disconnect()

Three things are happening here:

  1. TwitterClient is created during subject initialization. The subject is accessed from within the TwitterClient, so you need to pass the subject to its constructor.
  2. The method run starts the stream of tweets. Once started, the stream will flow indefinitely until it is closed or in case of failure.
  3. The method on_stop is called when the stream is closed or in case of failure. You have a chance to perform some cleanup.

Finally, you call pw.io.python.read as before:

class InputSchema(pw.Schema):
    key: int = pw.column_definition(primary_key=True)
    text: str

table = pw.io.python.read(
    TwitterSubject(),
    schema=InputSchema
)

pw.io.csv.write(table, "output.csv")

pw.run()

The complete working example can be found in the Pathway examples repository.

ConnectorSubject reference

In the examples above, you can see two implementations of the ConnectorSubject class. Now let's look in detail at the interface of this class.

Methods to implement

  • run: main function responsible for consuming the data and feeding the buffer with one of the methods described below.
  • on_stop: called when the stream is closed or in case of failure. A good place for doing all sorts of cleanup.

Embedded methods

  • next: takes values as keyword arguments. Values for all columns of the table should be passed to this method. The values are sent as a single row to Pathway.
  • next_bytes: sends bytes to Pathway in data column. If you want to have bytes in a different column/multiple columns, just use next.
  • next_str: sends string to Pathway in data column. If you want to have string in a different column/multiple columns, just use next.
  • commit: sends a commit message to Pathway. Only when the data is committed, Pathway starts processing it. You can commit manually using this method or rely on autocommits that can be set in pw.io.python.read.
  • close: indicates that there will be no more messages. Will be called automatically when the run method finishes.

Connector method reference

The pw.io.python.read connector method takes the following arguments:

  • subject: connector subject to consume.
  • schema: a schema describing your data, i.e. columns with their types and possibly other attributes. You can read more about defining schema in a dedicated tutorial.
  • autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed automatically and pushed into Pathway's dataflow.