Creating a custom Python connector
In this tutorial, you will learn how to create a Python connector that will allow you to connect to your custom data source and feed data directly into Pathway.
In order to create a custom connector, you need to extend the pw.io.python.ConnectorSubject
class provided by Pathway and implement the run
method, which is responsible for reading the source and feeding the data into the buffer.
ConnectorSubject
serves as a bridge between the data source and the Pathway engine. It comes with a special method next
allowing you to push data into the buffer.
There is also an extended explanation at the end of this article.
Simple scenario: reading from a static file
You will create a connector that turns a static file into a stream. Suppose you have a file cats.jsonl
in which each line contains data in JSON format:
{ "key": 1, "genus": "otocolobus", "epithet": "manul" }
{ "key": 2, "genus": "felis", "epithet": "catus" }
{ "key": 3, "genus": "lynx", "epithet": "lynx" }
In the run
method, let's read the file and send the contents to the buffer line by line, using the next
method.
import json
import time
import pathway as pw
class FileStreamSubject(pw.io.python.ConnectorSubject):
def run(self):
with open("cats.jsonl") as file:
for line in file:
data = json.loads(line)
self.next(**data)
time.sleep(1)
You can now read from the subject you created and run computations.
You need to call the method pw.io.python.read
, passing your subject to it. In addition, you need to specify the schema of the data, indicating the format and fields that will be extracted as columns.
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
genus: str
epithet: str
table = pw.io.python.read(
FileStreamSubject(),
schema=InputSchema,
)
pw.io.csv.write(table, "output.csv")
pw.run()
Under the hood, the connector will be started in a dedicated thread and will continue to work as long as the run
method is spinning.
Advanced scenario: using external Python libraries
⚠️ Twitter has turned off its free tier streaming API.
In the second example, let's utilize an external library called Tweepy to load a stream of tweets.
Tweepy is a Python library for accessing the Twitter API. You can install it with a simple pip command:
pip install tweepy
As a first step, you need to create a TwitterClient
class that extends tweepy.StreamingClient
:
import tweepy
class TwitterClient(tweepy.StreamingClient):
_subject: TwitterSubject
def __init__(self, subject: TwitterSubject) -> None:
super().__init__(BEARER_TOKEN)
self._subject = subject
def on_response(self, response) -> None:
self._subject.next(
key=response.data.id,
text=response.data.text,
)
The client holds the subject object, which will be described in a moment. The on_response
method is called when a new response from a stream is received. This is the right place to convert the message to the desired format and send it to a subject's buffer.
Note that you pass a value for each column as a separate keyword argument in the next
method.
As previously, you need to define the subject:
import pathway as pw
class TwitterSubject(pw.io.python.ConnectorSubject):
_twitter_client: TwitterClient
def __init__(self) -> None:
super().__init__()
self._twitter_client = TwitterClient(self)
def run(self) -> None:
self._twitter_client.sample()
def on_stop(self) -> None:
self._twitter_client.disconnect()
Three things are happening here:
TwitterClient
is created during subject initialization. The subject is accessed from within theTwitterClient
, so you need to pass the subject to its constructor.- The method
run
starts the stream of tweets. Once started, the stream will flow indefinitely until it is closed or in case of failure. - The method
on_stop
is called when the stream is closed or in case of failure. You have a chance to perform some cleanup.
Finally, you call pw.io.python.read
as before:
class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
text: str
table = pw.io.python.read(
TwitterSubject(),
schema=InputSchema
)
pw.io.csv.write(table, "output.csv")
pw.run()
The complete working example can be found in the Pathway examples repository.
ConnectorSubject reference
In the examples above, you can see two implementations of the ConnectorSubject
class. Now let's look in detail at the interface of this class.
Methods to implement
run
: main function responsible for consuming the data and feeding the buffer with one of the methods described below.on_stop
: called when the stream is closed or in case of failure. A good place for doing all sorts of cleanup.
Embedded methods
next
: takes values as keyword arguments. Values for all columns of the table should be passed to this method. The values are sent as a single row to Pathway.next_bytes
: sendsbytes
to Pathway indata
column. If you want to havebytes
in a different column/multiple columns, just usenext
.next_str
: sendsstring
to Pathway indata
column. If you want to havestring
in a different column/multiple columns, just usenext
.commit
: sends a commit message to Pathway. Only when the data is committed, Pathway starts processing it. You can commit manually using this method or rely on autocommits that can be set inpw.io.python.read
.close
: indicates that there will be no more messages. Will be called automatically when the run method finishes.
Connector method reference
The pw.io.python.read
connector method takes the following arguments:
subject
: connector subject to consume.schema
: a schema describing your data, i.e. columns with their types and possibly other attributes. You can read more about defining schema in a dedicated tutorial.autocommit_duration_ms
: the maximum time between two commits. Everyautocommit_duration_ms
milliseconds, the updates received by the connector are committed automatically and pushed into Pathway's dataflow.