pw.io.minio

class pw.io.minio.MinIOSettings(endpoint, bucket_name, access_key, secret_access_key, *, with_path_style=True, region=None)

[source]
Stores MinIO bucket connection settings.
  • Parameters
    • endpoint – Endpoint for the bucket.
    • bucket_name – Name of a bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • region – Region of the bucket.
    • with_path_style – Whether to use path-style addresses for bucket access. It defaults to True as this is the most widespread way to access MinIO, but can be overridden in case of a custom configuration.

pw.io.minio.read(path, minio_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects from S3 bucket in MinIO.

In case the prefix is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in MinIO S3 bucket.
    • minio_settings (MinIOSettings) – Connection parameters for the MinIO account and the bucket.
    • format (str) – Format of data to be read. Currently “csv”, “json” and “plaintext” formats are supported.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • mode (str) – If set to “streaming”, the engine will wait for the new objects under the given path prefix. Set it to “static”, it will only consider the available data and ingest all of it. Default value is “streaming”.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Consider that there is a table, which is stored in CSV format in the min.io S3 bucket. Then, you can use this method in order to connect and acquire its contents.

It may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
    owner: str
    pet: str
t = pw.io.minio.read(
    "animals/",
    minio_settings=pw.io.minio.MinIOSettings(
        bucket_name="datasets",
        endpoint="avv749.stackhero-network.com",
        access_key=os.environ["MINIO_S3_ACCESS_KEY"],
        secret_access_key=os.environ["MINIO_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)