pw.io.airbyte

pw.io.airbyte.read(config_file_path, streams, *, mode='streaming', refresh_interval_ms=1000)

sourceReads a table with an Airbyte connector that supports the incremental mode.

  • Parameters
    • config_file_path (PathLike | str) – Path to the config file, created with airbyte-serverless tool. The “source” section in this file must be properly configured in advance.
    • streams (Sequence[str]) – Airbyte stream names to be read.
    • mode (str) – denotes how the engine polls the new data from the source. Currently “streaming” and “static” are supported. If set to “streaming”, it will check for updates every refresh_interval_ms milliseconds. “static” mode will only consider the available data and ingest all of it in one commit. The default value is “streaming”.
    • refresh_interval_ms (int) – time in milliseconds between new data queries. Applicable if mode is set to “streaming”.
    • autocommit_duration_ms – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.

Returns:

A table with a column data, containing the pw.Json containing the data read from the connector. The format of this data corresponds to the one used in the Airbyte.

Example:

The simplest way to test this connector is to use The Sample Data (Faker) data source provided by Airbyte.

To do that, you first need to install the airbyte-serverless tool. It can be done from pip. Then, you can create the Faker data source as follows:

abs create simple --source "airbyte/source-faker:0.1.4"

The config file is located in ./connections/simple.yaml. It contains the basic parameters of the test data source, such as random seed and the number of records to be generated. You don’t have to modify any of them to proceed with this testing.

Now, you can just run the read from this configured source. It contains three streams: Users, Products, and Purchases. Let’s use the stream Users, which leads to us to the following code:

import pathway as pw
users_table = pw.io.airbyte.read(  
    "./connections/simple.yaml",
    streams=["Users"],
)

Let’s proceed to a more complex example.

Suppose that you need to read a stream of commits in a GitHub repository. To do so,you can use the Airbyte GitHub connector.

abs create github --source "airbyte/source-github"

Then, you need to edit the created config file, located at ./connections/github.yaml.

To get started in the quickest way possible, you can remove uncommented option_title, access_token, client_id and client_secret fields in the config while uncommenting the section “Another valid structure for credentials”. It will require the PAT token, which can be obtained at the Tokens page in the GitHub - please note that you need to be logged in.

Then, you also need to set up the repository name in the repositories field. For example, you can specify pathwaycom/pathway. Then you need to remove the unused optional fields, and you’re ready to go.

Now, you can simply configure the Pathway connector and run:

import pathway as pw
commits_table = pw.io.airbyte.read(  
    "./connections/github.yaml",
    streams=["commits"],
)

The result table will contain the JSON payloads with the comprehensive information about the commit times. If the mode is set to "streaming" (the default), the new commits will be appended to this table when they are made.

In some cases, it is not necessary to poll the changes because the data is given in full in the beginning and is not updated afterwards. For instance, in the first example we used with the users_table table, you could also use the static mode of the connector:

users_table = pw.io.airbyte.read(  
    "./connections/simple.yaml",
    streams=["Users"],
    mode="static",
)

In the second example, you could use this mode to load the commits data at once and then terminate the connector:

import pathway as pw
commits_table = pw.io.airbyte.read(  
    "./connections/github.yaml",
    streams=["commits"],
    mode="static",
)