pw.io.weaviate

Pathway writes tables to a Weaviate collection with pw.io.weaviate.write. The connector keeps the collection in sync with the table: additions and updates upsert objects, and deletions remove them. The target collection must already exist — the connector never creates or alters its schema or vector index.

NOTE: This connector is available when using one of the following licenses only: Pathway Live Data Framework Scale, Pathway Live Data Framework Enterprise. See the pricing page for details.

Object identity

Weaviate addresses every object by a UUID. When a primary_key column is given, its value is encoded in that UUID exactly as weaviate.util.generate_uuid5 does (uuid5(NAMESPACE_DNS, str(key))), so re-writing the same key upserts in place and the row for a key k can always be fetched with collection.query.fetch_object_by_id(generate_uuid5(k)). The key column is not stored as a property, which also lets it be named id — a name Weaviate otherwise reserves and forbids as a property.

When primary_key is omitted, the UUID is derived from the row’s internal Pathway key instead. Objects are still upserted in place, but there is no column-value-based UUID to look them up by.

Authentication

Pass api_key to authenticate with Weaviate. Extra request headers can be supplied through headers — for example to provide the API key of a server-side vectorizer module ({"X-OpenAI-Api-Key": "..."}).

Type mapping

Every column other than primary_key and vector becomes an object property. With Weaviate’s auto-schema, Pathway types are stored as follows. Declaring a property explicitly in the collection’s schema overrides this with the declared type (for example, declare a property as int to preserve integer typing).

Pathway typeWeaviate property typeNotes
intnumberStored as a number under auto-schema (read back as a float); declare the property as int in the collection to keep integer typing.
floatnumber
boolboolean
strtext
bytestextBase64-encoded.
list[float] / list[int]number[]When the column is not the one passed as vector.

The column passed as vector is stored as the object’s vector embedding (kept as 32-bit floats), not as a property. When vector is omitted, objects are written without an explicit vector — appropriate when the collection has a server-side vectorizer. Weaviate reserves the property names id and vector, so a non-key, non-vector column with either name is rejected.

write(table, collection_name, *, primary_key=None, vector=None, http_host='localhost', http_port=8080, http_secure=False, api_key=None, headers=None, batch_size=100, concurrency=8, name=None, sort_by=None)

sourceWrites a Pathway Live Data Framework table to a Weaviate collection.

Each row addition (diff = 1) upserts an object and each row deletion (diff = -1) removes it, keeping the collection in sync with the table. The target collection must already exist. See the connector documentation for how objects are identified, how each Pathway type is stored, and how to authenticate.

  • Parameters
    • table (Table) – The table to write.
    • collection_name (str) – Name of the Weaviate collection to write to. It must already exist.
    • primary_key (ColumnReference | None) – An optional column reference (e.g. table.doc_id) whose values are used to derive each object’s UUID; the column is not stored as a property. When omitted, the UUID is derived from the row’s internal Pathway key. The column must belong to table.
    • vector (ColumnReference | None) – An optional column reference (e.g. table.embedding) holding the vector embedding for each object. When given, that column is written as the object’s vector and not as a property. The column must belong to table.
    • http_host (str) – Host of the Weaviate server.
    • http_port (int) – Port of the Weaviate server.
    • http_secure (bool) – Whether the connection uses TLS (https).
    • api_key (str | None) – An optional API key used to authenticate with Weaviate.
    • headers (dict[str, str] | None) – Optional additional headers sent with every request, e.g. {"X-OpenAI-Api-Key": "..."} to authorize a server-side vectorizer.
    • batch_size (int) – Number of objects grouped together per write.
    • concurrency (int) – Maximum number of writes performed in parallel per worker.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output within each mini-batch will be sorted in ascending order by the given columns. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
  • Returns
    None

Example:

Suppose you are building a document search pipeline and want to store embeddings in Weaviate running locally (e.g. via the official Docker image, which serves its API on port 8080).

Create the collection before starting the pipeline. The none vectorizer keeps the embeddings the pipeline sends instead of recomputing them server-side:

import weaviate  
from weaviate.classes.config import Configure  
client = weaviate.connect_to_local()  
client.collections.create(  
    "Docs",
    vectorizer_config=Configure.Vectorizer.none(),
)
client.close()

Define your Pathway Live Data Framework schema and build the table:

import pathway as pw
class DocSchema(pw.Schema):
    doc_id: int = pw.column_definition(primary_key=True)
    embedding: list[float]

table = pw.debug.table_from_rows(
    DocSchema,
    [(1, [0.1, 0.2, 0.3, 0.4]), (2, [0.5, 0.6, 0.7, 0.8])],
)

Attach the Weaviate output connector, mapping the primary key and the vector column:

pw.io.weaviate.write(  
    table,
    collection_name="Docs",
    primary_key=table.doc_id,
    vector=table.embedding,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)