pw.io.milvus

write(table, uri, collection_name, *, primary_key, name=None, sort_by=None)

sourceThis connector is available when using one of the following licenses only:Pathway Scale, Pathway Enterprise.

Writes a Pathway table to a Milvus collection.

Each row addition (diff = 1) is sent to Milvus as an upsert and each row deletion (diff = -1) is sent as a delete. The value of the primary_key column is used as the Milvus primary key. The column must belong to table; passing a column from a different table raises a ValueError.

The target collection must already exist before the pipeline starts and its schema must be compatible with the table’s columns. Pathway cannot create it automatically because the vector field dimension is not part of the Python type (a list of floats carries no size) and is only known once the first row arrives. Use pymilvus.MilvusClient.create_collection to create the collection upfront.

Within every mini-batch, deletes are applied before upserts so that update pairs (retraction followed by insertion of the same key) are handled correctly.

Supported type mappings (Pathway → Milvus):

Pathway typeMilvus field typeNotes
intINT64
floatDOUBLE
strVARCHARField must declare max_length
boolBOOL
pw.JsonJSONWrapper unwrapped automatically
list of floatFLOAT_VECTORDimension set in the collection schema
bytesBINARY_VECTORDimension set in the collection schema
numpy.ndarrayFLOAT_VECTOR or BINARY_VECTORMust be 1-D; converted to list

Any other Pathway type raises a TypeError before reaching pymilvus, with a message that names the offending column and lists the supported types. A multi-dimensional numpy.ndarray raises a ValueError.

The uri parameter is passed directly to pymilvus.MilvusClient. Use a local .db file path (e.g. "./milvus.db") to use Milvus Lite, an embedded single-file database that requires no server, which is convenient for development and testing. Use a server address (e.g. "http://localhost:19530") to connect to a running Milvus instance for production workloads.

  • Parameters
    • table (Table) – The table to write.
    • uri (str) – URI passed to pymilvus.MilvusClient. Use a local .db file path for Milvus Lite (e.g. "./milvus.db") or a server address for a running Milvus instance (e.g. "http://localhost:19530").
    • collection_name (str) – Name of the Milvus collection to write to.
    • primary_key (ColumnReference) – A column reference (e.g. table.doc_id) whose values are used as the Milvus primary key. The column must belong to table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output within each mini-batch will be sorted in ascending order by the given columns. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
  • Returns
    None

Example:

Suppose you are building a document search pipeline and want to store embeddings in Milvus. The example below uses Milvus Lite — a local single-file database that requires no running server, which is convenient for development. For production, replace "./milvus.db" with your server URI, e.g. "http://localhost:19530".

Create the collection before starting the pipeline. The schema must define an integer primary key and a FLOAT_VECTOR field whose dimension matches the embeddings your pipeline will produce:

import pathway as pw
from pymilvus import DataType, MilvusClient  

client = MilvusClient("./milvus.db")  
schema = client.create_schema(auto_id=False)  
schema.add_field("doc_id", DataType.INT64, is_primary=True)  
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=4)  
index_params = client.prepare_index_params()  
index_params.add_index("embedding", metric_type="COSINE", index_type="FLAT")  
client.create_collection("docs", schema=schema, index_params=index_params)  
client.close()

Define your Pathway schema and build the table:

class DocSchema(pw.Schema):
    doc_id: int = pw.column_definition(primary_key=True)
    embedding: list[float]

table = pw.debug.table_from_rows(
    DocSchema,
    [(1, [0.1, 0.2, 0.3, 0.4]), (2, [0.5, 0.6, 0.7, 0.8])],
)

Attach the Milvus output connector and specify which column maps to the Milvus primary key field:

pw.io.milvus.write(   
    table,
    uri="./milvus.db",
    collection_name="docs",
    primary_key=table.doc_id,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)