pw.io.milvus
write(table, uri, collection_name, *, primary_key, name=None, sort_by=None)
sourceThis connector is available when using one of the following licenses only:Pathway Scale, Pathway Enterprise.
Writes a Pathway table to a Milvus collection.
Each row addition (diff = 1) is sent to Milvus as an upsert and each row
deletion (diff = -1) is sent as a delete. The value of the primary_key
column is used as the Milvus primary key. The column must belong to table;
passing a column from a different table raises a ValueError.
The target collection must already exist before the pipeline starts and its
schema must be compatible with the table’s columns. Pathway cannot create
it automatically because the vector field dimension is not part of the
Python type (a list of floats carries no size) and is only known once
the first row arrives. Use pymilvus.MilvusClient.create_collection to
create the collection upfront.
Within every mini-batch, deletes are applied before upserts so that update pairs (retraction followed by insertion of the same key) are handled correctly.
Supported type mappings (Pathway → Milvus):
| Pathway type | Milvus field type | Notes |
|---|---|---|
int | INT64 | |
float | DOUBLE | |
str | VARCHAR | Field must declare max_length |
bool | BOOL | |
pw.Json | JSON | Wrapper unwrapped automatically |
list of float | FLOAT_VECTOR | Dimension set in the collection schema |
bytes | BINARY_VECTOR | Dimension set in the collection schema |
numpy.ndarray | FLOAT_VECTOR or BINARY_VECTOR | Must be 1-D; converted to list |
Any other Pathway type raises a TypeError before reaching pymilvus,
with a message that names the offending column and lists the supported types.
A multi-dimensional numpy.ndarray raises a ValueError.
The uri parameter is passed directly to pymilvus.MilvusClient.
Use a local .db file path (e.g. "./milvus.db") to use
Milvus Lite, an embedded
single-file database that requires no server, which is convenient for
development and testing. Use a server address (e.g.
"http://localhost:19530") to connect to a running Milvus instance
for production workloads.
- Parameters
- table (
Table) – The table to write. - uri (
str) – URI passed topymilvus.MilvusClient. Use a local.dbfile path for Milvus Lite (e.g."./milvus.db") or a server address for a running Milvus instance (e.g."http://localhost:19530"). - collection_name (
str) – Name of the Milvus collection to write to. - primary_key (
ColumnReference) – A column reference (e.g.table.doc_id) whose values are used as the Milvus primary key. The column must belong totable. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output within each mini-batch will be sorted in ascending order by the given columns. When multiple columns are provided, the corresponding value tuples are compared lexicographically.
- table (
- Returns
None
Example:
Suppose you are building a document search pipeline and want to store
embeddings in Milvus. The example below uses Milvus Lite — a local
single-file database that requires no running server, which is convenient
for development. For production, replace "./milvus.db" with your
server URI, e.g. "http://localhost:19530".
Create the collection before starting the pipeline. The schema
must define an integer primary key and a FLOAT_VECTOR field whose
dimension matches the embeddings your pipeline will produce:
import pathway as pw
from pymilvus import DataType, MilvusClient
client = MilvusClient("./milvus.db")
schema = client.create_schema(auto_id=False)
schema.add_field("doc_id", DataType.INT64, is_primary=True)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=4)
index_params = client.prepare_index_params()
index_params.add_index("embedding", metric_type="COSINE", index_type="FLAT")
client.create_collection("docs", schema=schema, index_params=index_params)
client.close()
Define your Pathway schema and build the table:
class DocSchema(pw.Schema):
doc_id: int = pw.column_definition(primary_key=True)
embedding: list[float]
table = pw.debug.table_from_rows(
DocSchema,
[(1, [0.1, 0.2, 0.3, 0.4]), (2, [0.5, 0.6, 0.7, 0.8])],
)
Attach the Milvus output connector and specify which column maps to the Milvus primary key field:
pw.io.milvus.write(
table,
uri="./milvus.db",
collection_name="docs",
primary_key=table.doc_id,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)