pathway.io.s3_csv package


pathway.io.s3_csv.read(path, aws_s3_settings, value_columns, id_columns=None, csv_settings=None, mode='streaming', types=None, persistent_id=None, debug_data=None, **kwargs)

Reads a table from one or several objects in Amazon S3 bucket.

In case the prefix is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in Amazon S3 bucket.
    • aws_s3_settings (AwsS3Settings) – Connection parameters for the S3 account and the bucket.
    • value_columns (Liststr) – Names of the columns to be extracted from the files.
    • id_columns (OptionalListstr) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly.
    • csv_settings (Optional[CsvParserSettings]) – The settings for the CSV parser.
    • mode (str) – If set to “streaming”, the engine will wait for the new input files in the bucket, which fall under the path prefix. Set it to “static”, it will onlyconsider the available data and ingest all of it in one commit. Default value is”streaming”.
    • types (OptionalDictstr, PathwayType) – Dictionary containing the mapping between the columns and the data types (pw.Type) of the values of those columns. This parameter is optional, and if not provided the default type is pw.Type.ANY.
    • persistent_id (Optionalint) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data – Static data replacing original one when debug mode is active.
  • Returns
    The table read.
  • Return type
    Table

Example:

import os
import pathway as pw
t = pw.io.s3_csv.read(
"animals/",
aws_s3_settings=pw.io.s3_csv.AwsS3Settings.AwsS3Settings(
bucket_name="datasets",
region="eu-west-3",
access_key=os.environ["S3_ACCESS_KEY"],
secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
),
value_columns=["owner", "pet"],
)