pw.io.chroma
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
Chroma stores records with a fixed shape — a
string id, an embedding vector, an optional document text, and a set
of scalar metadata values. The connector therefore maps the columns of the
written table onto those fields explicitly: the primary_key column becomes
the record id, the embedding column becomes the vector, the optional
document column becomes the stored text, and the metadata_columns become
the record metadata. The primary_key column is optional — when it is omitted,
the row’s internal Pathway key is used as the record id instead. The target
collection must already exist before the
pipeline starts — the connector does not create it, because the vector dimension
is fixed by the collection and is not part of a Pathway Live Data Framework type.
A row added to the table is stored in the collection, a row removed from the table is deleted from the collection by its id, and a changed row replaces the previous value with the same id, so the collection always mirrors the current state of the table.
The table below explains how each Pathway Live Data Framework type is stored in
Chroma. Because Chroma record ids are strings, the value of the primary_key
column is converted to its string form regardless of its Pathway type.
Pathway Live Data Framework types serialization into Chroma
| Framework’s type | Record field | Stored as |
|---|---|---|
int (primary_key) | id | string — the integer’s decimal text, e.g. 42 → "42" |
str (primary_key) | id | string — used as-is |
bool (primary_key) | id | string — "true" / "false" |
pointer (primary_key) | id | string — the pointer’s string form |
list[float] / list[int] | embedding | a 32-bit float vector; float values are narrowed to 32-bit precision (Chroma stores embeddings as float32) |
one-dimensional np.ndarray (float or int) | embedding | a 32-bit float vector, as above; multi-dimensional arrays are rejected when the computation starts |
str (document) | document | string |
str (metadata_columns) | metadata value | string |
int (metadata_columns) | metadata value | integer |
float (metadata_columns) | metadata value | float |
bool (metadata_columns) | metadata value | boolean |
An Optional metadata column whose value is missing for a given row is stored
with that metadata key omitted for that record. Any other Pathway Live Data
Framework type passed as an embedding, document, or metadata column is rejected
when the computation starts.
write(table, collection_name, *, primary_key=None, embedding, document=None, metadata_columns=None, host='localhost', port=8000, ssl=False, headers=None, tenant='default_tenant', database='default_database', name=None, sort_by=None)
sourceWrites a Pathway Live Data Framework table to a Chroma collection over the server’s HTTP API.
The collection always mirrors the current state of the table: a row added to
the table is stored in the collection, a row removed from the table is
deleted from the collection, and a changed row replaces the previous value.
The primary_key column identifies each record; since Chroma record ids
are strings, its value is stored as its string form. When primary_key is
omitted, the row’s internal Pathway key is used as the record id instead.
Chroma records have a fixed shape, so the columns of table are mapped
onto Chroma’s record fields explicitly:
primary_key→ the recordid,embedding→ the recordembeddingvector,document(optional) → the recorddocumenttext,metadata_columns(optional) → the recordmetadata.
The target collection must already exist before the pipeline starts; the
connector does not create it. Create it upfront, e.g. with
chromadb.HttpClient(...).create_collection(...).
- Parameters
- table (
Table) – The table to write. - collection_name (
str) – Name of the Chroma collection to write to. It must already exist on the server. - primary_key (
ColumnReference|None) – An optional column reference (e.g.table.doc_id) whose values are used as the Chroma record id. The column must belong totable; values are converted tostr. If omitted (the default), the row’s internal Pathway key is used as the record id instead. - embedding (
ColumnReference) – A column reference (e.g.table.vector) holding the embedding vector for each row. Must be of Pathway typelist[float]or a 1-Dnumpy.ndarray. - document (
ColumnReference|None) – An optional column reference (e.g.table.text) holding the document text stored alongside each vector. Must be of typestr. - metadata_columns (
Optional[Iterable[ColumnReference]]) – Optional column references for scalar metadata stored with each vector (e.g.table.title, table.category). Each value must bestr,int,float, orbool. ANonevalue drops that key for the affected record. - host (
str) – Host of the Chroma server. Defaults to"localhost". - port (
int) – Port of the Chroma server. Defaults to8000. - ssl (
bool) – Whether to connect over HTTPS. Defaults toFalse. - headers (
dict[str,str] |None) – Optional HTTP headers forwarded with every request, e.g. an authorization token for Chroma Cloud. - tenant (
str) – Chroma tenant to use. Defaults to"default_tenant". - database (
str) – Chroma database to use. Defaults to"default_database". - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output within each minibatch will be sorted in ascending order by the given columns. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
- table (
- Returns
None
Example:
Suppose you are building a document search pipeline and want to store
embeddings in Chroma running in a container
(docker run -p 8000:8000 chromadb/chroma).
Create the collection before starting the pipeline:
import chromadb
client = chromadb.HttpClient(host="localhost", port=8000)
client.create_collection("docs")
Define your Pathway Live Data Framework schema and build the table:
import pathway as pw
class DocSchema(pw.Schema):
doc_id: int = pw.column_definition(primary_key=True)
text: str
embedding: list[float]
table = pw.debug.table_from_rows(
DocSchema,
[(1, "hello", [0.1, 0.2, 0.3, 0.4]), (2, "world", [0.5, 0.6, 0.7, 0.8])],
)
Attach the Chroma output connector, mapping each column to a Chroma record field:
pw.io.chroma.write(
table,
collection_name="docs",
primary_key=table.doc_id,
embedding=table.embedding,
document=table.text,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)