pw.io.s3
class pw.io.s3.AwsS3Settings(*, bucket_name=None, access_key=None, secret_access_key=None, with_path_style=False, region=None, endpoint=None, session_token=None)
[source]Stores Amazon S3 connection settings. You may also use this class to store configuration settings for any custom S3 installation, however you will need to specify the region and the endpoint.
- Parameters
- bucket_name – Name of S3 bucket.
- access_key – Access key for the bucket.
- secret_access_key – Secret access key for the bucket.
- with_path_style – Whether to use path-style requests.
- region – Region of the bucket.
- endpoint – Custom endpoint in case of self-hosted storage.
- session_token – Session token, an alternative way to authenticate to S3.
classmethod new_from_path(s3_path)
sourceConstructs settings from S3 path. The engine will look for the credentials in environment variables and in local AWS profiles. It will also automatically detect the region of the bucket.
This method may fail if there are no credentials or they are incorrect. It may also fail if the bucket does not exist.
- Parameters
s3_path (str
) – full path to the object in the forms3://<bucket_name>/<path>
. - Returns
Configuration object.
class pw.io.s3.DigitalOceanS3Settings(bucket_name, *, access_key=None, secret_access_key=None, region=None)
[source]Stores Digital Ocean S3 connection settings.
- Parameters
- bucket_name – Name of Digital Ocean S3 bucket.
- access_key – Access key for the bucket.
- secret_access_key – Secret access key for the bucket.
- region – Region of the bucket.
class pw.io.s3.WasabiS3Settings(bucket_name, *, access_key=None, secret_access_key=None, region=None)
[source]Stores Wasabi S3 connection settings.
- Parameters
- bucket_name – Name of Wasabi S3 bucket.
- access_key – Access key for the bucket.
- secret_access_key – Secret access key for the bucket.
- region – Region of the bucket.
pw.io.s3.read(path, format, *, aws_s3_settings=None, schema=None, mode='streaming', with_metadata=False, csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)
sourceReads a table from one or several objects in Amazon S3 bucket in the given format.
In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.
- Parameters
- path (
str
) – Path to an object or to a folder of objects in Amazon S3 bucket. - aws_s3_settings (
AwsS3Settings
|None
) – Connection parameters for the S3 account and the bucket. - format (
str
) – Format of data to be read. Currentlycsv
,json
,plaintext
,plaintext_by_object
andbinary
formats are supported. The difference betweenplaintext
andplaintext_by_object
is how the input is tokenized: if theplaintext
option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case thebinary
format is specified, the data is read as raw bytes without UTF-8 parsing. - schema (
type
[Schema
] |None
) – Schema of the resulting table. Not required forplaintext_by_object
andbinary
formats: if they are chosen, the contents of the read objects are stored in the columndata
. - mode (
str
) – If set tostreaming
, the engine waits for the new objects under the given path prefix. Set it tostatic
, it only considers the available data and ingest all of it. Default value isstreaming
. - with_metadata (
bool
) – When set to true, the connector will add an additional column named_metadata
to the table. This column will be a JSON field that will contain an optional fieldmodified_at
. Additionally, the column will also have an optional field namedowner
containing an ID of the object owner. Finally, the column will also contain a field namedpath
that will show the full path to the object within a bucket from where a row was filled. - csv_settings (
CsvParserSettings
|None
) – Settings for the CSV parser. This parameter is used only in case the specified format iscsv
. - json_field_paths (
dict
[str
,str
] |None
) – If the format isjson
, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format<field_name>: <path to be mapped>
, where the path to be mapped needs to be a JSON Pointer (RFC 6901). - downloader_threads_count (
int
|None
) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files. - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time. - debug_data (
Any
) – Static data replacing original one when debug mode is active.
- path (
- Returns
Table – The table read.
Example:
Let’s consider an object store, which is hosted in Amazon S3. The store contains
datasets in the respective bucket and is located in the region eu-west-3
. The goal
is to read the dataset, located under the path animals/
in this bucket.
Let’s suppose that the format of the dataset rows is jsonlines.
Then, the code may look as follows:
import os
import pathway as pw
class InputSchema(pw.Schema):
owner: str
pet: str
t = pw.io.s3.read(
"animals/",
aws_s3_settings=pw.io.s3.AwsS3Settings(
bucket_name="datasets",
region="eu-west-3",
access_key=os.environ["S3_ACCESS_KEY"],
secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
),
format="json",
schema=InputSchema,
)
In case you are dealing with a public bucket, the parameters access_key
and
secret_access_key
can be omitted. In this case, the read part will look as
follows:
t = pw.io.s3.read(
"animals/",
aws_s3_settings=pw.io.s3.AwsS3Settings(
bucket_name="datasets",
region="eu-west-3",
),
format="json",
schema=InputSchema,
)
It’s not obligatory to choose one of the available input tokenizations. You
can also read the objects in full, thus creating a pw.Table
where each row
corresponds to a single object read in full. To do that, you need to specify
binary
as a format:
t = pw.io.s3.read(
"animals/",
aws_s3_settings=pw.io.s3.AwsS3Settings(
bucket_name="datasets",
region="eu-west-3",
),
format="binary",
)
Similarly you can also enable the UTF-8 parsing of the objects read, resulting in having a table of plaintext files:
t = pw.io.s3.read(
"animals/",
aws_s3_settings=pw.io.s3.AwsS3Settings(
bucket_name="datasets",
region="eu-west-3",
),
format="plaintext_by_object",
)
Note that it’s also possible to infer the bucket name and credentials from the path,
if it’s given in a full form with s3://
prefix. For instance, in the example above
you can also connect as follows:
t = pw.io.s3.read("s3://datasets/animals", format="binary")
Note that you need to be logged in S3 for the credentials auto-detection to work.
Finally, you can also read the data from self-hosted S3 buckets, or generally those
where the endpoint path differs from the standard AWS paths. To do that, you can make
use of the endpoint
field of pw.io.s3.AwsS3Settings
class. One of the natural
examples for that may be the min.io S3 buckets. That is, if you have a min.io S3
bucket instance, one of the ways to connect to it via this connector would be the
first to create the settings object with the custom endpoint and path style:
custom_settings = pw.io.s3.AwsS3Settings(
endpoint="avv749.stackhero-network.com",
bucket_name="datasets",
access_key=os.environ["MINIO_S3_ACCESS_KEY"],
secret_access_key=os.environ["MINIO_S3_SECRET_ACCESS_KEY"],
with_path_style=True,
region="eu-west-3",
)
And you can connect with the usage of this created custom settings format:
t = pw.io.s3.read(
"animals/",
aws_s3_settings=custom_settings,
format="binary",
)
Please note that the min.io connection via generic S3 connector is given only as an
example: you may use pw.io.minio.read
connector which wouldn’t require any custom
settings object creation from you.
pw.io.s3.read_from_digital_ocean(path, do_s3_settings, format, *, schema=None, mode='streaming', with_metadata=False, csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)
sourceReads a table from one or several objects in Digital Ocean S3 bucket.
In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.
- Parameters
- path (
str
) – Path to an object or to a folder of objects in S3 bucket. - do_s3_settings (
DigitalOceanS3Settings
) – Connection parameters for the account and the bucket. - format (
str
) – Format of data to be read. Currentlycsv
,json
,plaintext
,plaintext_by_object
andbinary
formats are supported. The difference betweenplaintext
andplaintext_by_object
is how the input is tokenized: if theplaintext
option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case thebinary
format is specified, the data is read as raw bytes without UTF-8 parsing. - schema (
type
[Schema
] |None
) – Schema of the resulting table. Not required forplaintext_by_object
andbinary
formats: if they are chosen, the contents of the read objects are stored in the columndata
. - mode (
str
) – If set tostreaming
, the engine waits for the new objects under the given path prefix. Set it tostatic
, it only considers the available data and ingest all of it. Default value isstreaming
. - with_metadata (
bool
) – When set to true, the connector will add an additional column named_metadata
to the table. This column will be a JSON field that will contain an optional fieldmodified_at
. Additionally, the column will also have an optional field namedowner
containing an ID of the object owner. Finally, the column will also contain a field namedpath
that will show the full path to the object within a bucket from where a row was filled. - csv_settings (
CsvParserSettings
|None
) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”. - json_field_paths (
dict
[str
,str
] |None
) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format<field_name>: <path to be mapped>
, where the path to be mapped needs to be a JSON Pointer (RFC 6901). - downloader_threads_count (
int
|None
) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files. - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time. - debug_data (
Any
) – Static data replacing original one when debug mode is active.
- path (
- Returns
Table – The table read.
Example:
Let’s consider an object store, which is hosted in Digital Ocean S3. The store
contains CSV datasets in the respective bucket and is located in the region ams3.
The goal is to read the dataset, located under the path animals/
in this bucket.
Then, the code may look as follows:
import os
import pathway as pw
class InputSchema(pw.Schema):
owner: str
pet: str
t = pw.io.s3.read_from_digital_ocean(
"animals/",
do_s3_settings=pw.io.s3.DigitalOceanS3Settings(
bucket_name="datasets",
region="ams3",
access_key=os.environ["DO_S3_ACCESS_KEY"],
secret_access_key=os.environ["DO_S3_SECRET_ACCESS_KEY"],
),
format="csv",
schema=InputSchema,
)
Please note that this connector is interoperable with the AWS S3 connector,
therefore all examples concerning different data formats in pw.io.s3.read
also
work with Digital Ocean version.
pw.io.s3.read_from_wasabi(path, wasabi_s3_settings, format, *, schema=None, mode='streaming', with_metadata=False, csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)
sourceReads a table from one or several objects in Wasabi S3 bucket.
In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.
- Parameters
- path (
str
) – Path to an object or to a folder of objects in S3 bucket. - wasabi_s3_settings (
WasabiS3Settings
) – Connection parameters for the account and the bucket. - format (
str
) – Format of data to be read. Currentlycsv
,json
,plaintext
,plaintext_by_object
andbinary
formats are supported. The difference betweenplaintext
andplaintext_by_object
is how the input is tokenized: if theplaintext
option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case thebinary
format is specified, the data is read as raw bytes without UTF-8 parsing. - schema (
type
[Schema
] |None
) – Schema of the resulting table. Not required forplaintext_by_object
andbinary
formats: if they are chosen, the contents of the read objects are stored in the columndata
. - mode (
str
) – If set tostreaming
, the engine waits for the new objects under the given path prefix. Set it tostatic
, it only considers the available data and ingest all of it. Default value isstreaming
. - with_metadata (
bool
) – When set to true, the connector will add an additional column named_metadata
to the table. This column will be a JSON field that will contain an optional fieldmodified_at
. Additionally, the column will also have an optional field namedowner
containing an ID of the object owner. Finally, the column will also contain a field namedpath
that will show the full path to the object within a bucket from where a row was filled. - csv_settings (
CsvParserSettings
|None
) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”. - json_field_paths (
dict
[str
,str
] |None
) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format<field_name>: <path to be mapped>
, where the path to be mapped needs to be a JSON Pointer (RFC 6901). - downloader_threads_count (
int
|None
) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files. - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time. - debug_data (
Any
) – Static data replacing original one when debug mode is active.
- path (
- Returns
Table – The table read.
Example:
Let’s consider an object store, which is hosted in Wasabi S3. The store
contains CSV datasets in the respective bucket and is located in the region us-west-1
.
The goal is to read the dataset, located under the path animals/
in this bucket.
Then, the code may look as follows:
import os
import pathway as pw
class InputSchema(pw.Schema):
owner: str
pet: str
t = pw.io.s3.read_from_wasabi(
"animals/",
wasabi_s3_settings=pw.io.s3.WasabiS3Settings(
bucket_name="datasets",
region="us-west-1",
access_key=os.environ["WASABI_S3_ACCESS_KEY"],
secret_access_key=os.environ["WASABI_S3_SECRET_ACCESS_KEY"],
),
format="csv",
schema=InputSchema,
)
Please note that this connector is interoperable with the AWS S3 connector,
therefore all examples concerning different data formats in pw.io.s3.read
also
work with Wasabi version.