pw.io.chroma

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

Chroma stores records with a fixed shape — a string id, an embedding vector, an optional document text, and a set of scalar metadata values. The connector therefore maps the columns of the written table onto those fields explicitly: the primary_key column becomes the record id, the embedding column becomes the vector, the optional document column becomes the stored text, and the metadata_columns become the record metadata. The primary_key column is optional — when it is omitted, the row’s internal Pathway key is used as the record id instead. The target collection must already exist before the pipeline starts — the connector does not create it, because the vector dimension is fixed by the collection and is not part of a Pathway Live Data Framework type.

A row added to the table is stored in the collection, a row removed from the table is deleted from the collection by its id, and a changed row replaces the previous value with the same id, so the collection always mirrors the current state of the table.

The table below explains how each Pathway Live Data Framework type is stored in Chroma. Because Chroma record ids are strings, the value of the primary_key column is converted to its string form regardless of its Pathway type.

Pathway Live Data Framework types serialization into Chroma

Framework’s typeRecord fieldStored as
int (primary_key)idstring — the integer’s decimal text, e.g. 42"42"
str (primary_key)idstring — used as-is
bool (primary_key)idstring"true" / "false"
pointer (primary_key)idstring — the pointer’s string form
list[float] / list[int]embeddinga 32-bit float vector; float values are narrowed to 32-bit precision (Chroma stores embeddings as float32)
one-dimensional np.ndarray (float or int)embeddinga 32-bit float vector, as above; multi-dimensional arrays are rejected when the computation starts
str (document)documentstring
str (metadata_columns)metadata valuestring
int (metadata_columns)metadata valueinteger
float (metadata_columns)metadata valuefloat
bool (metadata_columns)metadata valueboolean

An Optional metadata column whose value is missing for a given row is stored with that metadata key omitted for that record. Any other Pathway Live Data Framework type passed as an embedding, document, or metadata column is rejected when the computation starts.

write(table, collection_name, *, primary_key=None, embedding, document=None, metadata_columns=None, host='localhost', port=8000, ssl=False, headers=None, tenant='default_tenant', database='default_database', name=None, sort_by=None)

sourceWrites a Pathway Live Data Framework table to a Chroma collection over the server’s HTTP API.

The collection always mirrors the current state of the table: a row added to the table is stored in the collection, a row removed from the table is deleted from the collection, and a changed row replaces the previous value. The primary_key column identifies each record; since Chroma record ids are strings, its value is stored as its string form. When primary_key is omitted, the row’s internal Pathway key is used as the record id instead.

Chroma records have a fixed shape, so the columns of table are mapped onto Chroma’s record fields explicitly:

  • primary_key → the record id,
  • embedding → the record embedding vector,
  • document (optional) → the record document text,
  • metadata_columns (optional) → the record metadata.

The target collection must already exist before the pipeline starts; the connector does not create it. Create it upfront, e.g. with chromadb.HttpClient(...).create_collection(...).

  • Parameters
    • table (Table) – The table to write.
    • collection_name (str) – Name of the Chroma collection to write to. It must already exist on the server.
    • primary_key (ColumnReference | None) – An optional column reference (e.g. table.doc_id) whose values are used as the Chroma record id. The column must belong to table; values are converted to str. If omitted (the default), the row’s internal Pathway key is used as the record id instead.
    • embedding (ColumnReference) – A column reference (e.g. table.vector) holding the embedding vector for each row. Must be of Pathway type list[float] or a 1-D numpy.ndarray.
    • document (ColumnReference | None) – An optional column reference (e.g. table.text) holding the document text stored alongside each vector. Must be of type str.
    • metadata_columns (Optional[Iterable[ColumnReference]]) – Optional column references for scalar metadata stored with each vector (e.g. table.title, table.category). Each value must be str, int, float, or bool. A None value drops that key for the affected record.
    • host (str) – Host of the Chroma server. Defaults to "localhost".
    • port (int) – Port of the Chroma server. Defaults to 8000.
    • ssl (bool) – Whether to connect over HTTPS. Defaults to False.
    • headers (dict[str, str] | None) – Optional HTTP headers forwarded with every request, e.g. an authorization token for Chroma Cloud.
    • tenant (str) – Chroma tenant to use. Defaults to "default_tenant".
    • database (str) – Chroma database to use. Defaults to "default_database".
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output within each minibatch will be sorted in ascending order by the given columns. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
  • Returns
    None

Example:

Suppose you are building a document search pipeline and want to store embeddings in Chroma running in a container (docker run -p 8000:8000 chromadb/chroma).

Create the collection before starting the pipeline:

import chromadb  
client = chromadb.HttpClient(host="localhost", port=8000)  
client.create_collection("docs")

Define your Pathway Live Data Framework schema and build the table:

import pathway as pw
class DocSchema(pw.Schema):
    doc_id: int = pw.column_definition(primary_key=True)
    text: str
    embedding: list[float]

table = pw.debug.table_from_rows(
    DocSchema,
    [(1, "hello", [0.1, 0.2, 0.3, 0.4]), (2, "world", [0.5, 0.6, 0.7, 0.8])],
)

Attach the Chroma output connector, mapping each column to a Chroma record field:

pw.io.chroma.write(  
    table,
    collection_name="docs",
    primary_key=table.doc_id,
    embedding=table.embedding,
    document=table.text,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)