pw.io.leann
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
write(table, index_path, text_column, *, metadata_columns=None, backend_name='hnsw', embedding_mode=None, embedding_model=None, embedding_options=None, name=None)
sourceWrite table data to a LEANN vector index.
LEANN is a storage-efficient vector database that uses graph-based selective recomputation to achieve up to 97% storage reduction compared to traditional vector databases while maintaining high recall.
The connector observes every Pathway minibatch. Whenever rows are added or
removed, it rebuilds the full LEANN index from the current snapshot of the
table. The result is written as a set of files that share index_path as
their prefix (e.g. ./articles.leann.hnsw, ./articles.leann.meta.json).
This keeps the index always consistent with the latest committed state of
the table.
Performance considerations. LEANN currently builds the index from scratch on every update — there is no incremental add or delete operation. If the document set is large and changes arrive frequently, rebuilding the full index after every minibatch will be slow. Use this connector with caution in streaming pipelines:
- Static mode is the ideal fit. When you run Pathway once to convert a collection from one format into a LEANN index, the index is built exactly once and the cost is fully amortized.
- Infrequent commits also work well. If your streaming pipeline
commits rarely (large
autocommit_duration_ms, or an external commit trigger), rebuilds happen seldom and the overhead stays manageable. - High-frequency streaming over a large corpus is not a good fit. Every commit triggers a full rebuild; with many small commits and thousands of documents this can become a bottleneck. In that scenario, consider a vector store that supports incremental updates.
Limitations. Only str columns are accepted for text_column and
metadata_columns — passing a column of any other type raises a
ValueError at pipeline construction time. Rows whose text column is
empty or None are silently skipped and a warning is logged.
- Parameters
- table (
Table) – The Pathway table to index. - index_path (
str|PathLike) – Prefix for the LEANN index files. LEANN writes several files with this value as the common prefix (e.g. providing"./articles.leann"produces"./articles.leann.hnsw","./articles.leann.meta.json", and so on). - text_column (
ColumnReference) – Column reference for the column containing text to embed (e.g.table.body). The column must belong totableand be of typestr. - metadata_columns (
list[ColumnReference] |None) – Column references for additionalstrcolumns to store alongside each vector (e.g.table.title, table.category). All columns must belong totable. - backend_name (
Literal['hnsw','diskann']) – LEANN graph backend —"hnsw"(default) or"diskann". - embedding_mode (
Optional[Literal['sentence-transformers','openai','mlx','ollama']]) – Embedding provider —"sentence-transformers","openai","mlx", or"ollama". WhenNone, LEANN’s own default is used. - embedding_model (
str|None) – Specific model name, e.g."facebook/contriever". WhenNone, the provider’s default model is used. - embedding_options (
dict|None) – Additional options forwarded to the embedding provider, e.g.{"api_key": "...", "base_url": "..."}. - name (
str|None) – Unique name for this connector instance, used in logs and persistence snapshots.
- table (
- Returns
None
Note:
- The index is fully rebuilt after every minibatch that contains changes. Existing index files are overwritten on each build.
- Requires the
leannpackage. See https://github.com/yichuan-w/LEANN for installation instructions.
Example:
Suppose you have a CSV file articles.csv with columns title,
body, and category, and you want to build a LEANN vector index
over the article bodies so that you can run semantic search against it.
Start by defining the schema that matches your CSV:
import pathway as pw
class ArticleSchema(pw.Schema):
title: str
body: str
category: str
Read the source file and register the LEANN sink. Pass the body
column as the text to embed; title and category are stored as
metadata that travels with each vector and can be returned alongside
search results:
table = pw.io.csv.read("articles.csv", schema=ArticleSchema)
pw.io.leann.write(
table,
index_path="./articles.leann",
text_column=table.body,
metadata_columns=[table.title, table.category],
backend_name="hnsw",
embedding_model="facebook/contriever",
)
Run the pipeline. In static mode Pathway processes the file once and writes the index; in streaming mode it keeps the index up to date as new articles arrive:
pw.run()