pw.io.pinecone
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
Pathway writes each row of the table as a single Pinecone record: the value of the
primary_key column becomes the record id (converted to a string), the
vector column (list[float] or a 1-D numpy.ndarray, with length equal
to the index dimension) becomes the dense vector, and the metadata columns become
the record metadata.
Metadata columns are converted by type as follows:
| Pathway type | Pinecone type |
|---|---|
int, float | number |
bool | boolean |
str | string |
list[str] | list of strings |
A None metadata value is omitted, as Pinecone rejects null metadata. A
metadata column of any other type (for example bytes, a nested pw.Json
object, or a numeric array) is not supported and raises an error that names the
offending column.
write(table, index_name, *, primary_key=None, vector, api_key=None, host=None, namespace='', metadata_columns=None, batch_size=100, name=None, sort_by=None)
sourceWrites a Pathway Live Data Framework table to a Pinecone index.
The Pinecone index is kept in sync with the current state of the table. When a
row is added to the table it is upserted into the index, and when a row is
removed from the table the corresponding record is deleted; an update replaces
the previous record for that id. Only the current state is reflected — the
connector does not append a change log, so no time or diff columns are
written.
The Pinecone record id is taken from the primary_key column when given. By
default (primary_key=None) the table’s internal row key is used as the id:
it is unique by construction and matches the way the dataflow shards rows, so
writing runs in parallel across workers. A primary_key column instead lets
you choose a meaningful id, but its values must uniquely identify rows; to
guarantee that, writing then runs on a single worker and a collision (two rows
with the same id) raises an error rather than silently overwriting.
The vector column provides the dense embedding and every remaining column —
or just the columns listed in metadata_columns — is stored as record
metadata. None metadata values are dropped; a metadata value of a type
Pinecone cannot store raises an error naming the offending column. See the
connector documentation for the full type-conversion table. For
high-dimensional vectors, batch_size may be reduced automatically so a
single request stays within Pinecone’s size limit.
The target index must already exist before the pipeline starts and its dimension must match the length of the produced vectors; the connector does not create it.
- Parameters
- table (
Table) – The table to write. - index_name (
str) – Name of the Pinecone index to write to. - primary_key (
ColumnReference|None) – A column reference (e.g.table.doc_id) whose values are used as the Pinecone record id. The column must belong totableand its values must uniquely identify rows (the id selects the record to upsert or delete); a collision raises an error at runtime and writing runs on a single worker. WhenNone(the default), the table’s internal row key is used as the id — always unique and written in parallel across workers — but the ids are then Pathway’s internal keys rather than your own values. - vector (
ColumnReference) – A column reference holding the dense embedding (list[float]or a 1-Dnumpy.ndarray). The column must belong totable. - api_key (
str|None) – Pinecone API key. WhenNone, thePINECONE_API_KEYenvironment variable is used. - host (
str|None) – Control-plane host. Leave asNonefor Pinecone cloud (https://api.pinecone.io); set it to a local URL such as"http://localhost:5080"to target Pinecone Local. - namespace (
str) – Pinecone namespace to write to. Defaults to the index’s default namespace. - metadata_columns (
Optional[Iterable[ColumnReference]]) – Column references for the columns to store as record metadata. WhenNone, every column exceptprimary_keyandvectoris stored. All columns must belong totable. - batch_size (
int) – Maximum number of records sent to Pinecone perupsertcall (capped at Pinecone’s per-request limit of 1000). - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output within each mini-batch will be sorted in ascending order by the given columns.
- table (
- Returns
None
Example:
Suppose you have a couple of documents, each already turned into a 4-dimensional embedding, and you want them to be searchable in Pinecone — and to stay in sync as the documents change.
The first thing to take care of is the index itself: the connector never
creates it, so it has to exist before the pipeline starts, and its
dimension must match the length of your vectors (4 here).
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="YOUR_API_KEY")
pc.create_index(
name="docs",
dimension=4,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
With the index in place, you describe the data. Each document has an id that
will become the Pinecone record id, the embedding itself, and a title you
would like to keep around — here as a small static table, but in a real
pipeline this would come from a connector:
import pathway as pw
class DocSchema(pw.Schema):
doc_id: str = pw.column_definition(primary_key=True)
embedding: list[float]
title: str
table = pw.debug.table_from_rows(
DocSchema,
[("a", [0.1, 0.2, 0.3, 0.4], "First"), ("b", [0.5, 0.6, 0.7, 0.8], "Second")],
)
Now you point the connector at the index, spelling out which column is the
record id and which one holds the vector. Everything left over — in this case
title — is stored as record metadata automatically, so there is nothing
else to configure:
pw.io.pinecone.write(
table,
index_name="docs",
primary_key=table.doc_id,
vector=table.embedding,
api_key="YOUR_API_KEY",
)
Notice that nothing has reached Pinecone yet: like every Pathway pipeline, the
write is lazy. It is the final pw.run() that actually starts the dataflow,
upserts the two records, and from then on keeps the index in sync with any
later change to table:
pw.run(monitoring_level=pw.MonitoringLevel.NONE)