pw.io.qdrant

This module is available when using one of the following licenses only: Pathway Live Data Framework Scale, Pathway Live Data Framework Enterprise.

write(table, url, collection_name, *, vector, api_key=None, batch_size=256, name=None)

sourceWrites a Pathway Live Data Framework table to a Qdrant collection.

Each row addition (diff = 1) is sent to Qdrant as a point upsert and each row deletion (diff = -1) removes the corresponding point. The vector column provides the point’s vector and every other column of table is stored in the point’s payload. The point id is assigned internally rather than taken from a column, so keep any identifier you need as an ordinary column (it is stored in the payload).

Within every minibatch an insertion always wins over a deletion of the same key, so an update (a retraction of the old row followed by an insertion of the new one) replaces the point rather than removing it.

If the target collection does not exist yet, it is created on the first write with a single Cosine-distance vector whose size matches the written vectors. Create the collection yourself beforehand if you need a different distance metric or vector configuration.

The vector column must hold a list[float] or a 1-D numpy.ndarray. Payload columns may be of any type the Pathway Live Data Framework JSON serializer supports (int, float, str, bool, pw.Json, lists, tuples, bytes, and 1-D numpy.ndarray).

  • Parameters
    • table (Table) – The table to write.
    • url (str) – URL of the Qdrant instance’s gRPC endpoint, e.g. "http://localhost:6334" (Qdrant’s gRPC port is 6334 by default).
    • collection_name (str) – Name of the Qdrant collection to write to.
    • vector (ColumnReference) – A column reference (e.g. table.embedding) holding the point vector as a list[float] or 1-D numpy.ndarray. The column must belong to table.
    • api_key (str | None) – Optional API key used to authenticate with Qdrant Cloud or a secured instance.
    • batch_size (int) – Maximum number of points sent to Qdrant in a single upsert or delete request. A minibatch larger than this is split into several requests, keeping each request bounded for high-dimensional vectors.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
  • Returns
    None

Example:

Suppose you are building a document search pipeline and want to store embeddings in Qdrant.

import pathway as pw
class DocSchema(pw.Schema):
    doc_id: int = pw.column_definition(primary_key=True)
    embedding: list[float]
    title: str

table = pw.debug.table_from_rows(
    DocSchema,
    [(1, [0.1, 0.2, 0.3, 0.4], "a"), (2, [0.5, 0.6, 0.7, 0.8], "b")],
)

Attach the Qdrant output connector, pointing at the vector column. The remaining columns (here doc_id and title) are stored in the point payload:

pw.io.qdrant.write(   
    table,
    url="http://localhost:6334",
    collection_name="docs",
    vector=table.embedding,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)