Creating a custom Python connector
In this tutorial, you will learn how to create a Python connector that will allow you to connect to your custom data source and feed data directly into Pathway.
In order to create a custom connector, you need to extend the pw.io.python.ConnectorSubject
class provided by Pathway and implement the run
method, which is responsible for reading the source and feeding the data into the buffer.
ConnectorSubject
serves as a bridge between the data source and the Pathway engine. It comes with several methods allowing you to push data into the buffer.
You can find more about those functions in the dedicated documentation.
There is also an extended explanation at the end of this article.
Simple scenario: reading from a static file
You will create a connector that turns a static file into a stream. Suppose you have a file cats.jsonl
in which each line contains data in JSON format:
{ "key": 1, "genus": "otocolobus", "epithet": "manul" }
{ "key": 2, "genus": "felis", "epithet": "catus" }
{ "key": 3, "genus": "lynx", "epithet": "lynx" }
In the run
method, let's read the file and send the contents to the buffer line by line, using next_str
method.
import time
import pathway as pw
class FileStreamSubject(pw.io.python.ConnectorSubject):
def run(self):
with open("cats.jsonl") as file:
for line in file:
self.next_str(line)
time.sleep(1)
You can now read from the subject you created and run computations.
You need to call the method pw.io.python.read
, passing your subject to it. In addition, you need to specify the schema of the data, indicating the format and fields that will be extracted as columns.
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
genus: str
epithet: str
table = pw.io.python.read(
FileStreamSubject(),
schema=InputSchema,
format="json"
)
pw.io.csv.write(table, "output.csv")
pw.run()
Under the hood, the connector will be started in a dedicated thread and will continue to work as long as the run
method is spinning.
Advanced scenario: using external Python libraries
In the second example, let's utilize an external library called Tweepy to load a stream of tweets.
Tweepy is a Python library for accessing the Twitter API. You can install it with a simple pip command:
pip install tweepy
As a first step, you need to create a TwitterClient
class that extends tweepy.StreamingClient
:
import tweepy
class TwitterClient(tweepy.StreamingClient):
_subject: TwitterSubject
def __init__(self, subject: TwitterSubject) -> None:
super().__init__(BEARER_TOKEN)
self._subject = subject
def on_response(self, response) -> None:
self._subject.next_json(
{
"key": response.data.id,
"text": response.data.text,
}
)
The client holds the subject object, which will be described in a moment. The on_response
method is called when a new response from a stream is received. This is the right place to convert the message to the desired format and send it to a subject's buffer.
This time you can use the next_json
method which accepts a dictionary.
As previously, you need to define the subject:
import pathway as pw
class TwitterSubject(pw.io.python.ConnectorSubject):
_twitter_client: TwitterClient
def __init__(self) -> None:
super().__init__()
self._twitter_client = TwitterClient(self)
def run(self) -> None:
self._twitter_client.sample()
def on_stop(self) -> None:
self._twitter_client.disconnect()
Three things are happening here:
TwitterClient
is created during subject initialization. The subject is accessed from within theTwitterClient
, so you need to pass the subject to its constructor.- The method
run
starts the stream of tweets. Once started, the stream will flow indefinitely until it is closed or in case of failure. - The method
on_stop
is called when the stream is closed or in case of failure. You have a chance to perform some cleanup.
Finally, you call pw.io.python.read
as before:
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
text: str
table = pw.io.python.read(
TwitterSubject(),
schema=InputSchema
)
pw.io.csv.write(table, "output.csv")
pw.run()
The complete working example can be found in the Pathway examples repository.
ConnectorSubject reference
In the examples above, you can see two implementations of the ConnectorSubject
class. Now let's look in detail at the interface of this class.
Methods to implement
run
: main function responsible for consuming the data and feeding the buffer with one of the methods described below.on_stop
: called when the stream is closed or in case of failure. A good place for doing all sorts of cleanup.
Embedded methods
next_bytes
: sends a message in the form ofbytes
.next_str
: takes a message in the form of a string and sends it after converting it intobytes
.next_json
: takes a message in the form of a dict representing JSON and sends it after converting it intobytes
.close
: indicates that there will be no more messages. Will be called automatically when the run method finishes.
Connector method reference
The pw.io.python.read
connector method takes the following arguments:
subject
: connector subject to consume.format
: in the examples above, we used thejson
format. It is also possible to use theraw
format. For raw data, there is only one columndata
in the resulting table, where all entries are dumped.value_columns
: if the format is notraw
, it is the list of the columns which should be expected.primary_key
: in case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated as uuid4.types
: a dictionary containing the mapping between the columns and the data types of the values of those columns. This parameter is optional, and if not provided the default type is string.autocommit_duration_ms
: the maximum time between two commits. Everyautocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway's dataflow.