pw.io.pinecone

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

Pathway writes each row of the table as a single Pinecone record: the value of the primary_key column becomes the record id (converted to a string), the vector column (list[float] or a 1-D numpy.ndarray, with length equal to the index dimension) becomes the dense vector, and the metadata columns become the record metadata.

Metadata columns are converted by type as follows:

Pathway typePinecone type
int, floatnumber
boolboolean
strstring
list[str]list of strings

A None metadata value is omitted, as Pinecone rejects null metadata. A metadata column of any other type (for example bytes, a nested pw.Json object, or a numeric array) is not supported and raises an error that names the offending column.

write(table, index_name, *, primary_key=None, vector, api_key=None, host=None, namespace='', metadata_columns=None, batch_size=100, name=None, sort_by=None)

sourceWrites a Pathway Live Data Framework table to a Pinecone index.

The Pinecone index is kept in sync with the current state of the table. When a row is added to the table it is upserted into the index, and when a row is removed from the table the corresponding record is deleted; an update replaces the previous record for that id. Only the current state is reflected — the connector does not append a change log, so no time or diff columns are written.

The Pinecone record id is taken from the primary_key column when given. By default (primary_key=None) the table’s internal row key is used as the id: it is unique by construction and matches the way the dataflow shards rows, so writing runs in parallel across workers. A primary_key column instead lets you choose a meaningful id, but its values must uniquely identify rows; to guarantee that, writing then runs on a single worker and a collision (two rows with the same id) raises an error rather than silently overwriting.

The vector column provides the dense embedding and every remaining column — or just the columns listed in metadata_columns — is stored as record metadata. None metadata values are dropped; a metadata value of a type Pinecone cannot store raises an error naming the offending column. See the connector documentation for the full type-conversion table. For high-dimensional vectors, batch_size may be reduced automatically so a single request stays within Pinecone’s size limit.

The target index must already exist before the pipeline starts and its dimension must match the length of the produced vectors; the connector does not create it.

  • Parameters
    • table (Table) – The table to write.
    • index_name (str) – Name of the Pinecone index to write to.
    • primary_key (ColumnReference | None) – A column reference (e.g. table.doc_id) whose values are used as the Pinecone record id. The column must belong to table and its values must uniquely identify rows (the id selects the record to upsert or delete); a collision raises an error at runtime and writing runs on a single worker. When None (the default), the table’s internal row key is used as the id — always unique and written in parallel across workers — but the ids are then Pathway’s internal keys rather than your own values.
    • vector (ColumnReference) – A column reference holding the dense embedding (list[float] or a 1-D numpy.ndarray). The column must belong to table.
    • api_key (str | None) – Pinecone API key. When None, the PINECONE_API_KEY environment variable is used.
    • host (str | None) – Control-plane host. Leave as None for Pinecone cloud (https://api.pinecone.io); set it to a local URL such as "http://localhost:5080" to target Pinecone Local.
    • namespace (str) – Pinecone namespace to write to. Defaults to the index’s default namespace.
    • metadata_columns (Optional[Iterable[ColumnReference]]) – Column references for the columns to store as record metadata. When None, every column except primary_key and vector is stored. All columns must belong to table.
    • batch_size (int) – Maximum number of records sent to Pinecone per upsert call (capped at Pinecone’s per-request limit of 1000).
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output within each mini-batch will be sorted in ascending order by the given columns.
  • Returns
    None

Example:

Suppose you have a couple of documents, each already turned into a 4-dimensional embedding, and you want them to be searchable in Pinecone — and to stay in sync as the documents change.

The first thing to take care of is the index itself: the connector never creates it, so it has to exist before the pipeline starts, and its dimension must match the length of your vectors (4 here).

from pinecone import Pinecone, ServerlessSpec  
pc = Pinecone(api_key="YOUR_API_KEY")  
pc.create_index(  
    name="docs",
    dimension=4,
    metric="cosine",
    spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)

With the index in place, you describe the data. Each document has an id that will become the Pinecone record id, the embedding itself, and a title you would like to keep around — here as a small static table, but in a real pipeline this would come from a connector:

import pathway as pw
class DocSchema(pw.Schema):
    doc_id: str = pw.column_definition(primary_key=True)
    embedding: list[float]
    title: str

table = pw.debug.table_from_rows(
    DocSchema,
    [("a", [0.1, 0.2, 0.3, 0.4], "First"), ("b", [0.5, 0.6, 0.7, 0.8], "Second")],
)

Now you point the connector at the index, spelling out which column is the record id and which one holds the vector. Everything left over — in this case title — is stored as record metadata automatically, so there is nothing else to configure:

pw.io.pinecone.write(   
    table,
    index_name="docs",
    primary_key=table.doc_id,
    vector=table.embedding,
    api_key="YOUR_API_KEY",
)

Notice that nothing has reached Pinecone yet: like every Pathway pipeline, the write is lazy. It is the final pw.run() that actually starts the dataflow, upserts the two records, and from then on keeps the index in sync with any later change to table:

pw.run(monitoring_level=pw.MonitoringLevel.NONE)