pw.io.s3

class pw.io.s3.AwsS3Settings(*, bucket_name=None, access_key=None, secret_access_key=None, with_path_style=False, region=None, endpoint=None)

[source]
Stores Amazon S3 connection settings. You may also use this class to store configuration settings for any custom S3 installation, however you will need to specify the region and the endpoint.
  • Parameters
    • bucket_name – Name of S3 bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • with_path_style – Whether to use path-style requests.
    • region – Region of the bucket.
    • endpoint – Custom endpoint in case of self-hosted storage.

classmethod new_from_path(s3_path)

sourceConstructs settings from S3 path. The engine will look for the credentials in environment variables and in local AWS profiles. It will also automatically detect the region of the bucket.

This method may fail if there are no credentials or they are incorrect. It may also fail if the bucket does not exist.

  • Parameters
    s3_path (str) – full path to the object in the form s3://<bucket_name>/<path>.
  • Returns
    Configuration object.

class pw.io.s3.DigitalOceanS3Settings(bucket_name, *, access_key=None, secret_access_key=None, region=None)

[source]
Stores Digital Ocean S3 connection settings.
  • Parameters
    • bucket_name – Name of Digital Ocean S3 bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • region – Region of the bucket.

class pw.io.s3.WasabiS3Settings(bucket_name, *, access_key=None, secret_access_key=None, region=None)

[source]
Stores Wasabi S3 connection settings.
  • Parameters
    • bucket_name – Name of Wasabi S3 bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • region – Region of the bucket.

pw.io.s3.read(path, format, *, aws_s3_settings=None, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects in Amazon S3 bucket in the given format.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in Amazon S3 bucket.
    • aws_s3_settings (AwsS3Settings | None) – Connection parameters for the S3 account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Amazon S3. The store contains datasets in the respective bucket and is located in the region eu-west-3. The goal is to read the dataset, located under the path animals/ in this bucket.

Let’s suppose that the format of the dataset rows is jsonlines.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
        access_key=os.environ["S3_ACCESS_KEY"],
        secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
    ),
    format="json",
    schema=InputSchema,
)

In case you are dealing with a public bucket, the parameters access_key and secret_access_key can be omitted. In this case, the read part will look as follows:

t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
    ),
    format="json",
    schema=InputSchema,
)

pw.io.s3.read_from_digital_ocean(path, do_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects in Digital Ocean S3 bucket.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in S3 bucket.
    • do_s3_settings (DigitalOceanS3Settings) – Connection parameters for the account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Digital Ocean S3. The store contains CSV datasets in the respective bucket and is located in the region ams3. The goal is to read the dataset, located under the path animals/ in this bucket.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read_from_digital_ocean(
    "animals/",
    do_s3_settings=pw.io.s3.DigitalOceanS3Settings(
        bucket_name="datasets",
        region="ams3",
        access_key=os.environ["DO_S3_ACCESS_KEY"],
        secret_access_key=os.environ["DO_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)

pw.io.s3.read_from_wasabi(path, wasabi_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects in Wasabi S3 bucket.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in S3 bucket.
    • wasabi_s3_settings (WasabiS3Settings) – Connection parameters for the account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Wasabi S3. The store contains CSV datasets in the respective bucket and is located in the region us-west-1. The goal is to read the dataset, located under the path animals/ in this bucket.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read_from_wasabi(
    "animals/",
    wasabi_s3_settings=pw.io.s3.WasabiS3Settings(
        bucket_name="datasets",
        region="us-west-1",
        access_key=os.environ["WASABI_S3_ACCESS_KEY"],
        secret_access_key=os.environ["WASABI_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)