pathway.io.s3 package

Functions

pw.io.s3.read(path, aws_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

Reads a table from one or several objects in Amazon S3 bucket in the given format.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in Amazon S3 bucket.
    • aws_s3_settings (AwsS3Settings) – Connection parameters for the S3 account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (Optional[Type[Schema]]) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (Optional[CsvParserSettings]) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (Optional[Dict[str, str]]) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (Optional[str]) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Amazon S3. The store contains datasets in the respective bucket and is located in the region eu-west-3. The goal is to read the dataset, located under the path animals/ in this bucket.

Let’s suppose that the format of the dataset rows is jsonlines.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
        access_key=os.environ["S3_ACCESS_KEY"],
        secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
    ),
    format="json",
    schema=InputSchema,
)

In case you are dealing with a public bucket, the parameters access_key and secret_access_key can be omitted. In this case, the read part will look as follows:

t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
    ),
    format="json",
    schema=InputSchema,
)

pw.io.s3.read_from_digital_ocean(path, do_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

Reads a table from one or several objects in Digital Ocean S3 bucket.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in S3 bucket.
    • do_s3_settings (DigitalOceanS3Settings) – Connection parameters for the account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (Optional[Type[Schema]]) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (Optional[CsvParserSettings]) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (Optional[Dict[str, str]]) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (Optional[str]) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Digital Ocean S3. The store contains CSV datasets in the respective bucket and is located in the region ams3. The goal is to read the dataset, located under the path animals/ in this bucket.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read_from_digital_ocean(
    "animals/",
    do_s3_settings=pw.io.s3.DigitalOceanS3Settings(
        bucket_name="datasets",
        region="ams3",
        access_key=os.environ["DO_S3_ACCESS_KEY"],
        secret_access_key=os.environ["DO_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)

pw.io.s3.read_from_wasabi(path, wasabi_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

Reads a table from one or several objects in Wasabi S3 bucket.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in S3 bucket.
    • wasabi_s3_settings (WasabiS3Settings) – Connection parameters for the account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (Optional[Type[Schema]]) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (Optional[CsvParserSettings]) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (Optional[Dict[str, str]]) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (Optional[str]) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Wasabi S3. The store contains CSV datasets in the respective bucket and is located in the region us-west-1. The goal is to read the dataset, located under the path animals/ in this bucket.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read_from_wasabi(
    "animals/",
    wasabi_s3_settings=pw.io.s3.WasabiS3Settings(
        bucket_name="datasets",
        region="us-west-1",
        access_key=os.environ["WASABI_S3_ACCESS_KEY"],
        secret_access_key=os.environ["WASABI_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)