pw.ml

⚠️ For a more complete suite of Machine Learning tools and capabilities, please explore our Enterprise version

pw.ml.classifiers.knn_lsh_classifier_train(data, L, type='euclidean', **kwargs)

sourceBuild the LSH index over data. L the number of repetitions of the LSH scheme. Returns a LSH projector of type (queries: Table, k:Any) -> Table

pw.ml.classifiers.knn_lsh_classify(knn_model, data_labels, queries, k)

sourceClassify the queries. Use the knn_model to extract the k closest datapoints. The queries are then labeled using a majority vote between the labels of the retrieved datapoints, using the labels provided in data_labels.

pw.ml.classifiers.knn_lsh_euclidean_classifier_train(data, d, M, L, A)

sourceBuild the LSH index over data using the Euclidean distances. d is the dimension of the data, L the number of repetition of the LSH scheme, M and A are specific to LSH with Euclidean distance, M is the number of random projections done to create each bucket and A is the width of each bucket on each projection.

pw.ml.classifiers.knn_lsh_generic_classifier_train(data, lsh_projection, distance_function, L)

sourceBuild the LSH index over data using the a generic lsh_projector and its associated distance. L the number of repetitions of the LSH scheme. Returns a LSH projector of type (queries: Table, k:Any) -> Table

pw.ml.classifiers.knn_lsh_train(data, L, type='euclidean', **kwargs)

sourceBuild the LSH index over data. L the number of repetitions of the LSH scheme. Returns a LSH projector of type (queries: Table, k:Any) -> Table

class pw.ml.index.KNNIndex(data_embedding, data, n_dimensions, n_or=20, n_and=10, bucket_length=10.0, distance_type='euclidean', metadata=None)

[source]

An approximate K-Nearest Neighbors (KNN) index implementation using the Locality-Sensitive Hashing (LSH) algorithm within Pathway. This index is designed to efficiently find the nearest neighbors of a given query embedding within a dataset. It is approximate in a sense that it might return less than k records per query or skip some closer points. If it returns not enough points too frequently, increase bucket_length accordingly. If it skips points too often, increase n_or or play with other parameters. Note that changing the parameters will influence the time and memory requirements.

  • Parameters
    • data_embedding (pw.ColumnExpression) – The column expression representing embeddings in the data.
    • data (pw.Table) – The table containing the data to be indexed.
    • n_dimensions (int) – number of dimensions in the data
    • n_or (int) – number of ORs
    • n_and (int) – number of ANDs
    • bucket_length (float) – bucket length (after projecting on a line)
    • distance_type (str) – “euclidean” and “cosine” metrics are supported.
    • metadata (pw.ColumnExpression) – optional column expression representing dict of the metadata.

get_nearest_items(query_embedding, k=3, collapse_rows=True, with_distances=False, metadata_filter=None)

sourceThis method queries the index with given queries and returns ‘k’ most relevant documents for each query in the stream. While using this method, documents associated with the queries will be updated if new more relevant documents appear. If you don’t want queries results to get updated in the future, take a look at get_nearest_items_asof_now.

  • Parameters
    • query_embedding (ColumnReference) – column of embedding vectors precomputed from the query.
    • k (ColumnExpression | int) – The number of most relevant documents to return for each query. Can be constant for all queries or set per query. If you want to set k per query, pass a reference to the column. Defaults to 3.
    • collapse_rows (bool) – Determines the format of the output. If set to True, multiple rows corresponding to a single query will be collapsed into a single row, with each column containing a tuple of values from the original rows. If set to False, the output will retain the multi-row format for each query. Defaults to True.
    • with_distances (bool) – whether to output distances
    • metadata_filter (pw.ColumnExpression) – optional column expression containing evaluating to the text representing the metadata filtering query in the JMESPath format. The search will happen only for documents satisfying this filtering. Can be constant for all queries or set per query.
  • Returns
    pw.Table
  • If collapse_rows is set to True: Returns a table where each row corresponds to a unique query.

Each column in the row contains a tuple (or list) of values, aggregating up to ‘k’ matches from the dataset. For example:

            | name                        | age
^YYY4HAB... | ()                          | ()
^X1MXHYY... | ('bluejay', 'cat', 'eagle') | (43, 42, 41)
  • If collapse_rows is set to False: Returns a table where each row represents a match from the dataset

for a given query. Multiple rows can correspond to the same query, up to ‘k’ matches. Example:

name    | age | embedding | query_id
        |     |           | ^YYY4HAB...
bluejay | 43  | (4, 3, 2) | ^X1MXHYY...
cat     | 42  | (3, 3, 2) | ^X1MXHYY...
eagle   | 41  | (2, 3, 2) | ^X1MXHYY...

Example:

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex
import pandas as pd
class InputSchema(pw.Schema):
    document: str
    embeddings: list[float]
    metadata: dict
documents = pw.debug.table_from_pandas(
    pd.DataFrame.from_records([
        {"document": "document 1", "embeddings":[1,-1, 0], "metadata":{"foo": 1}},
        {"document": "document 2", "embeddings":[1, 1, 0], "metadata":{"foo": 2}},
        {"document": "document 3", "embeddings":[0, 0, 1], "metadata":{"foo": 3}},
    ]),
    schema=InputSchema
)
index = KNNIndex(documents.embeddings, documents, n_dimensions=3)
queries = pw.debug.table_from_pandas(
    pd.DataFrame.from_records([
        {"query": "What is doc 3 about?", "embeddings":[.1, .1, .1]},
        {"query": "What is doc -5 about?", "embeddings":[-1, 10, -10]},
    ])
)
relevant_docs = index.get_nearest_items(queries.embeddings, k=2).without(pw.this.metadata)
pw.debug.compute_and_print(relevant_docs)
index = KNNIndex(documents.embeddings, documents, n_dimensions=3, metadata=documents.metadata)
relevant_docs_meta = index.get_nearest_items(queries.embeddings, k=2, metadata_filter="foo >= `3`")
pw.debug.compute_and_print(relevant_docs_meta)
data = pw.debug.table_from_markdown(
    '''
     x | y | __time__
     2 | 3 |     2
     0 | 0 |     2
     2 | 2 |     6
    -3 | 3 |    10
    '''
).select(coords=pw.make_tuple(pw.this.x, pw.this.y))
queries = pw.debug.table_from_markdown(
    '''
     x | y | __time__ | __diff__
     1 | 1 |     4    |     1
    -3 | 1 |     8    |     1
    '''
).select(coords=pw.make_tuple(pw.this.x, pw.this.y))
index = KNNIndex(data.coords, data, n_dimensions=2)
answers = queries + index.get_nearest_items(queries.coords, k=2).select(
    nn=pw.this.coords
)
pw.debug.compute_and_print_update_stream(answers, include_id=False)

get_nearest_items_asof_now(query_embedding, k=3, collapse_rows=True, with_distances=False, metadata_filter=None)

sourceThis method queries the index with given queries and returns ‘k’ most relevant documents for each query in the stream. The already answered queries are not updated in the future if new documents appear.

  • Parameters
    • query_embedding (ColumnReference) – column of embedding vectors precomputed from the query.
    • k (ColumnExpression | int) – The number of most relevant documents to return for each query. Can be constant for all queries or set per query. If you want to set k per query, pass a reference to the column. Defaults to 3.
    • collapse_rows (bool) – Determines the format of the output. If set to True, multiple rows corresponding to a single query will be collapsed into a single row, with each column containing a tuple of values from the original rows. If set to False, the output will retain the multi-row format for each query. Defaults to True.
    • metadata_filter (pw.ColumnExpression) – optional column expression containing evaluating to the text representing the metadata filtering query in the JMESPath format. The search will happen only for documents satisfying this filtering. Can be constant for all queries or set per query.

Example:

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex
data = pw.debug.table_from_markdown(
    '''
     x | y | __time__
     2 | 3 |     2
     0 | 0 |     2
     2 | 2 |     6
    -3 | 3 |    10
    '''
).select(coords=pw.make_tuple(pw.this.x, pw.this.y))
queries = pw.debug.table_from_markdown(
    '''
     x | y | __time__ | __diff__
     1 | 1 |     4    |     1
    -3 | 1 |     8    |     1
    '''
).select(coords=pw.make_tuple(pw.this.x, pw.this.y))
index = KNNIndex(data.coords, data, n_dimensions=2)
answers = queries + index.get_nearest_items_asof_now(queries.coords, k=2).select(
    nn=pw.this.coords
)
pw.debug.compute_and_print_update_stream(answers, include_id=False)

pw.ml.hmm.create_hmm_reducer(graph, beam_size=None, num_results_kept=None)

sourceGenerates a reducer performing decoding of the Hidden Markov Model.

  • Parameters
    • graph (DiGraph) – directed graph representing the state transitions of the HMM.
    • beam_size (int | None) – limits the search. Defaults to None
    • num_results_kept (int | None) – Maximum number of previous results returned. Defaults to None (keep all the results).

Example:

import pathway as pw
import networkx as nx
from typing import Literal
from functools import partial
MANUL_STATES = Literal["HUNGRY", "FULL"]
MANUL_OBSERVATIONS = Literal["GRUMPY", "HAPPY"]
input_observations = pw.debug.table_from_markdown('''
    observation | __time__
     HAPPY      |     1
     HAPPY      |     2
     GRUMPY     |     3
     GRUMPY     |     4
     HAPPY      |     5
     GRUMPY     |     6
''')
def get_manul_hmm() -> nx.DiGraph:
   g = nx.DiGraph()
   def _calc_emission_log_ppb(
       observation: MANUL_OBSERVATIONS, state: MANUL_STATES
   ) -> float:
       if state == "HUNGRY":
           if observation == "GRUMPY":
               return np.log(0.9)
           if observation == "HAPPY":
               return np.log(0.1)
       if state == "FULL":
           if observation == "GRUMPY":
               return np.log(0.7)
           if observation == "HAPPY":
               return np.log(0.3)
   g.add_node(
       "HUNGRY", calc_emission_log_ppb=partial(_calc_emission_log_ppb, state="HUNGRY")
   )
   g.add_node(
       "FULL", calc_emission_log_ppb=partial(_calc_emission_log_ppb, state="FULL")
   )
   g.add_edge("HUNGRY", "HUNGRY", log_transition_ppb=np.log(0.4))
   g.add_edge("HUNGRY", "FULL", log_transition_ppb=np.log(0.6))
   g.add_edge("FULL", "HUNGRY", log_transition_ppb=np.log(0.6))
   g.add_edge("FULL", "FULL", log_transition_ppb=np.log(0.4))
   g.graph["start_nodes"] = ["HUNGRY", "FULL"]
   return g
hmm_reducer = pw.reducers.udf_reducer(pw.stdlib.ml.hmm.create_hmm_reducer(get_manul_hmm(), num_results_kept=3))
decoded = input_observations.reduce(decoded_state=hmm_reducer(pw.this.observation))
pw.debug.compute_and_print_update_stream(decoded, include_id=False)