In this tutorial, you will learn how to create a Python connector that will allow you to connect to your custom data source and feed data directly into Pathway.
In order to create a custom connector, you need to extend the pw.io.python.ConnectorSubject
class provided by Pathway and implement the run
method, which is responsible for reading the source and feeding the data into the buffer.
ConnectorSubject
serves as a bridge between the data source and the Pathway engine. It comes with several methods allowing you to push data into the buffer.
You can find more about those functions in the dedicated documentation.
There is also an extended explanation at the end of this article.
You will create a connector that turns a static file into a stream. Suppose you have a file cats.jsonl
in which each line contains data in JSON format:
{ "key": 1, "genus": "otocolobus", "epithet": "manul" }{ "key": 2, "genus": "felis", "epithet": "catus" }{ "key": 3, "genus": "lynx", "epithet": "lynx" }
In the run
method, let's read the file and send the contents to the buffer line by line, using next_str
method.
import timeimport pathway as pwclass FileStreamSubject(pw.io.python.ConnectorSubject): def run(self): with open("cats.jsonl") as file: for line in file: self.next_str(line) time.sleep(1)
You can now read from the subject you created and run computations.
You need to call the method pw.io.python.read
, passing your subject to it. In addition, you need to specify the shape of the data, indicating the format and fields that will be extracted as columns.
table = pw.io.python.read( FileStreamSubject(), value_columns=["genus", "epithet"], primary_key=["key"], format="json")pw.io.csv.write(table, "output.csv")pw.run()
Under the hood, the connector will be started in a dedicated thread and will continue to work as long as the run
method is spinning.
In the second example, let's utilize an external library called tweepy
to load a stream of tweets.
tweepy
is a Python library for accessing the Twitter API. You can install it with a simple pip install tweepy
.
As a first step, you need to create a TwitterClient
class that extends tweepy.StreamingClient
:
import tweepyclass TwitterClient(tweepy.StreamingClient): _subject: TwitterSubject def __init__(self, subject: TwitterSubject) -> None: super().__init__(BEARER_TOKEN) self._subject = subject def on_response(self, response) -> None: self._subject.next_json( { "key": response.data.id, "text": response.data.text, } )
The client holds the subject object, which will be described in a moment. The on_response
method is called when a new response from a stream is received. This is the right place to convert the message to the desired format and send it to a subject's buffer.
This time you can use the next_json
method which accepts a dictionary.
As previously, you need to define the subject:
import pathway as pwclass TwitterSubject(pw.io.python.ConnectorSubject): _twitter_client: TwitterClient def __init__(self) -> None: super().__init__() self._twitter_client = TwitterClient(self) def run(self) -> None: self._twitter_client.sample() def on_stop(self) -> None: self._twitter_client.disconnect()
Three things are happening here:
TwitterClient
is created during subject initialization. The subject is accessed from within the TwitterClient
, so you need to pass the subject to its constructor.run
starts the stream of tweets. Once started, the stream will flow indefinitely until it is closed or in case of failure.on_stop
is called when the stream is closed or in case of failure. You have a chance to perform some cleanup.Finally, you call pw.io.python.read
as before:
table = pw.io.python.read( TwitterSubject(), primary_key=["key"], value_columns=["text"])pw.io.csv.write(table, "output.csv")pw.run()
Complete working example can be found in the Pathway examples repository.
In the examples above, you could see two implementations of the ConnectorSubject
class. Now let's look in detail at the interface of this class.
run
: main function responsible for consuming the data and feeding the buffer with one of the methods described below.on_stop
: called when the stream is closed or in case of failure. A good place for doing all sort of cleanup.next_bytes
: sends a message in the form of bytes
.next_str
: takes a message in the form of a string and sends it after converting it into bytes
.next_json
: takes a message in the form of a dict representing json and sends it after converting it into bytes
.close
: indicates that there will be no more messages. Will be called automatically when the run method finishes.The pw.io.python.read
connector method takes the following arguments:
subject
: connector subject to consume.format
: in the examples above, we used the json
format. It is also possible to use the raw
format. For raw data, there is only one column data
in the resulting table, where all entries are dumped.value_columns
: if the format is not raw
, it is the list of the columns which should be expected.primary_key
: in case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated as uuid4.types
: a dictionary containing the mapping between the columns and the data types of the values of those columns. This parameter is optional, and if not provided the default type is string.autocommit_duration_ms
: the maximum time between two commits. Every autocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway's computation graph.