pw.io.airbyte
pw.io.airbyte.read(config_file_path, streams, *, execution_type='local', mode='streaming', env_vars=None, service_user_credentials_file=None, gcp_region='europe-west1', gcp_job_name=None, enforce_method=None, refresh_interval_ms=60000, persistent_id=None)
sourceReads a table with an Airbyte connector that supports the incremental mode.
If the local execution type is selected, Pathway initially attempts to find the
specified connector on PyPI and install its latest version in a separate virtual
environment. If the connector isn’t written in Python or isn’t found on PyPI, it
will be executed using a Docker image. Keep in mind, you can change this behavior
using the enforce_method
parameter.
Please be aware that it is highly recommended to use the enforce_method
parameter
in production deployments. This is because autodetection can fail if the PyPI service
is unavailable.
- Parameters
- config_file_path (
PathLike
|str
) – Path to the config file, created with airbyte-serverless tool or just via Pathway CLI (that usesairbyte-serverless
under the hood). The “source” section in this file must be properly configured in advance. - streams (
Sequence
[str
]) – Airbyte stream names to be read. - execution_type (
str
) – denotes how the airbyte connector is run. If"local"
is specified the connector is executed on a local machine. If"remote"
is used, the connector runs as a Google Cloud Run job. - mode (
str
) – denotes how the engine polls the new data from the source. Currently"streaming"
and"static"
are supported. If set to"streaming"
, it will check for updates everyrefresh_interval_ms
milliseconds."static"
mode will only consider the available data and ingest all of it in one commit. The default value is"streaming"
. - env_vars (
dict
[str
,str
] |None
) – environment variables to be set in the Airbyte connector before its’ execution. - service_user_credentials_file (
str
|None
) – Google API service user json file. You can refer the instructions provided in the developer’s user guide to obtain them. The credentials are required for the"remote"
execution type. - gcp_region (
str
) – Google region for the cloud job. - gcp_job_name (
str
|None
) – the name of GCP job if"remote"
execution type is chosen. If unspecified, the name is autogenerated. - refresh_interval_ms (
int
) – time in milliseconds between new data queries. Applicable if mode is set to"streaming"
. - enforce_method (
str
|None
) – when set to"docker"
, Pathway will not try to locate and run the latest connector version from PyPI. On the other hand, when set to"pypi"
, Pathway will prefer the usage of the latest image available on PyPI. Use this option when you need to ensure certain behaviour on the local run.
- config_file_path (
Returns:
A table with a column data
, containing the
pw.Json containing the data read from
the connector. The format of this data corresponds to the one used in the Airbyte.
Example:
The simplest way to test this connector is to use The Sample Data (Faker) data source provided by Airbyte.
To do that, you can use Pathway CLI command airbyte create-source
. You can create
the Faker
data source as follows:
The config file is located in ./connections/simple.yaml
. It
contains the basic parameters of the test data source, such as random seed and the
number of records to be generated. You don’t have to modify any of them to proceed with
this testing.
Now, you can just run the read from this configured source. It contains three
streams: users
, products
, and purchases
. Let’s use the stream users
,
which leads us to the following code:
import pathway as pw
users_table = pw.io.airbyte.read(
"./connections/simple.yaml",
streams=["users"],
)
Let’s proceed to a more complex example.
Suppose that you need to read a stream of commits in a GitHub repository. To do so, you can use the Airbyte GitHub connector.
abs create github --source "airbyte/source-github"
Then, you need to edit the created config file, located at ./connections/github.yaml.
To get started in the quickest way possible, you can
remove uncommented option_title
, access_token
, client_id
and client_secret
fields in the config while uncommenting the section “Another valid structure for
credentials”. It will require the PAT token, which can be obtained at the
Tokens page in the GitHub - please note that
you need to be logged in.
Then, you also need to set up the repository name in the repositories
field. For
example, you can specify pathwaycom/pathway
. Then you need to remove the unused
optional fields, and you’re ready to go.
Now, you can simply configure the Pathway connector and run:
import pathway as pw
commits_table = pw.io.airbyte.read(
"./connections/github.yaml",
streams=["commits"],
)
The result table will contain the JSON payloads with the comprehensive information
about the commit times. If the mode
is set to "streaming"
(the default), the
new commits will be appended to this table when they are made.
In some cases, it is not necessary to poll the changes because the data is given
in full in the beginning and is not updated afterwards. For instance, in the first
example we used with the users_table
table, you could also use the static mode of
the connector:
users_table = pw.io.airbyte.read(
"./connections/simple.yaml",
streams=["Users"],
mode="static",
)
In the second example, you could use this mode to load the commits data at once and then terminate the connector:
commits_table = pw.io.airbyte.read(
"./connections/github.yaml",
streams=["commits"],
mode="static",
)
While it’s not the case with Github connector, which is implemented in Python,
it’s worth noticing that deployment of the code running with the "local"
execution
type may be challenging because some connectors use Docker under the hood. That may
lead to a situation where you use Docker to deploy the code which, in turn, uses Docker
image to run Airbyte’s data extraction routines. This problem is widely known as DinD.
To avoid DinD you may use the "remote"
type of execution. If chosen, it runs the
Airbyte’s data extraction part on the Google Cloud, which also saves CPU and memory at your
development machine or server. To enable the "remote"
execution type you would need to
specify the corresponding execution type and to provide a path to the service account
credentials data file. Consider that the credentials are located in the file
./credentials.json
. Then, running the second example with the "remote"
type of
execution looks as follows:
commits_table = pw.io.airbyte.read(
"./connections/github.yaml",
streams=["commits"],
mode="static",
execution_type="remote",
service_user_credentials_file="./credentials.json",
)
Please keep in mind that the Google Cloud Runs are
billed based on the vCPU time and memory time,
measured in vCPU-seconds and GiB-seconds respectively. Having that said, the usage of
small values for refresh_interval_ms
is not advised for the remote runs, as they may
result in more runs and consequently more vCPU and memory time spent, resulting in a
bigger bill.