Creating a custom Python connector

In this tutorial, you will learn how to create a Python connector that will allow you to connect to your custom data source and feed data directly into Pathway.

In order to create a custom connector, you need to extend the pw.io.python.ConnectorSubject class provided by Pathway and implement the run method, which is responsible for reading the source and feeding the data into the buffer.

ConnectorSubject serves as a bridge between the data source and the Pathway engine. It comes with several methods allowing you to push data into the buffer. You can find more about those functions in the dedicated documentation. There is also an extended explanation at the end of this article.

Simple scenario: reading from a static file

You will create a connector that turns a static file into a stream. Suppose you have a file cats.jsonl in which each line contains data in JSON format:

{ "key": 1, "genus": "otocolobus", "epithet": "manul" }
{ "key": 2, "genus": "felis", "epithet": "catus" }
{ "key": 3, "genus": "lynx", "epithet": "lynx" }

In the run method, let's read the file and send the contents to the buffer line by line, using next_str method.

import time
import pathway as pw

class FileStreamSubject(pw.io.python.ConnectorSubject):
  def run(self):
    with open("cats.jsonl") as file:
      for line in file:
        self.next_str(line)
        time.sleep(1)

You can now read from the subject you created and run computations.

You need to call the method pw.io.python.read, passing your subject to it. In addition, you need to specify the schema of the data, indicating the format and fields that will be extracted as columns.

class InputSchema(pw.Schema):
  key: int = pw.column_definition(primary_key=True)
  genus: str
  epithet: str

table = pw.io.python.read(
  FileStreamSubject(),
  schema=InputSchema,
  format="json"
)

pw.io.csv.write(table, "output.csv")

pw.run()

Under the hood, the connector will be started in a dedicated thread and will continue to work as long as the run method is spinning.

Advanced scenario: using external Python libraries

In the second example, let's utilize an external library called Tweepy to load a stream of tweets.

Tweepy is a Python library for accessing the Twitter API. You can install it with a simple pip command:

pip install tweepy

As a first step, you need to create a TwitterClient class that extends tweepy.StreamingClient:

import tweepy

class TwitterClient(tweepy.StreamingClient):
    _subject: TwitterSubject

    def __init__(self, subject: TwitterSubject) -> None:
        super().__init__(BEARER_TOKEN)
        self._subject = subject

    def on_response(self, response) -> None:
        self._subject.next_json(
            {
                "key": response.data.id,
                "text": response.data.text,
            }
        )

The client holds the subject object, which will be described in a moment. The on_response method is called when a new response from a stream is received. This is the right place to convert the message to the desired format and send it to a subject's buffer. This time you can use the next_json method which accepts a dictionary.

As previously, you need to define the subject:

import pathway as pw

class TwitterSubject(pw.io.python.ConnectorSubject):
    _twitter_client: TwitterClient

    def __init__(self) -> None:
        super().__init__()
        self._twitter_client = TwitterClient(self)

    def run(self) -> None:
        self._twitter_client.sample()

    def on_stop(self) -> None:
        self._twitter_client.disconnect()

Three things are happening here:

  1. TwitterClient is created during subject initialization. The subject is accessed from within the TwitterClient, so you need to pass the subject to its constructor.
  2. The method run starts the stream of tweets. Once started, the stream will flow indefinitely until it is closed or in case of failure.
  3. The method on_stop is called when the stream is closed or in case of failure. You have a chance to perform some cleanup.

Finally, you call pw.io.python.read as before:

class InputSchema(pw.Schema):
  key: int = pw.column_definition(primary_key=True)
  text: str

table = pw.io.python.read(
    TwitterSubject(),
    schema=InputSchema
)

pw.io.csv.write(table, "output.csv")

pw.run()

The complete working example can be found in the Pathway examples repository.

ConnectorSubject reference

In the examples above, you can see two implementations of the ConnectorSubject class. Now let's look in detail at the interface of this class.

Methods to implement

  • run: main function responsible for consuming the data and feeding the buffer with one of the methods described below.
  • on_stop: called when the stream is closed or in case of failure. A good place for doing all sorts of cleanup.

Embedded methods

  • next_bytes: sends a message in the form of bytes.
  • next_str: takes a message in the form of a string and sends it after converting it into bytes.
  • next_json: takes a message in the form of a dict representing JSON and sends it after converting it into bytes.
  • close: indicates that there will be no more messages. Will be called automatically when the run method finishes.

Connector method reference

The pw.io.python.read connector method takes the following arguments:

  • subject: connector subject to consume.
  • format: in the examples above, we used the json format. It is also possible to use the raw format. For raw data, there is only one column data in the resulting table, where all entries are dumped.
  • value_columns: if the format is not raw, it is the list of the columns which should be expected.
  • primary_key: in case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated as uuid4.
  • types: a dictionary containing the mapping between the columns and the data types of the values of those columns. This parameter is optional, and if not provided the default type is string.
  • autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway's computation graph.
connectorPython