Overview of LLM xpack

The LLM xpack provides you all the tools you need to use Large Language Models in Pathway. Wrappers for most common LLM services and utilities are included, making working with LLMs as easy as it can be.

Wrappers for LLMs

Out of the box, the LLM xpack provides wrappers for text generation and embedding LLMs. For text generation, you can use native wrappers for the OpenAI chat model and HuggingFace models running locally. Many other popular models, including Azure OpenAI, HuggingFace (when using their API) or Gemini can be used with the wrapper for LiteLLM. To check the full list of providers supported by LiteLLM check LiteLLM documentation.

Each wrapper is a UDFClass. To use it, first create an instance of the wrapper, which you can then apply to a column with prompts. For OpenAI, you create a wrapper with OpenAIChat class.

model = OpenAIChat(
    model="gpt-3.5-turbo",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Send queries from column `messages` in table `query` to OpenAI
responses = query.select(result=model(pw.this.messages))

Preparing queries

OpenAIChat expects messages to be in the format required by OpenAI API - that is a list of dictionaries, where each dictionary is one message in the conversation so far. If you want to ask single questions use pw.xpacks.llm.llm.prompt_chat_single_qa to wrap them.

from pathway.xpack.llm.llms import prompt_chat_single_qa


model = OpenAIChat(
    model="gpt-3.5-turbo",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Column `prompt` holds strings with questions to be sent to OpenAI chat 
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

Model Parameters

OpenAI API takes a number of parameters, including model and api_key used in the code stubs above. OpenAIChat allows you to set their default value during the initialization of the class, but you can also override them during application.

model = OpenAIChat(
    model="gpt-3.5-turbo",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
    max_tokens=200, # Set default value of max_tokens to be 200
)
# As max_tokens is not set, value 200 will be used
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
# Now value of max_tokens is taken from column `max_tokens`, overriding default value set when initializing OpenAIChat
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt), max_tokens(pw.this.max_tokens)))

What about other models?

So far we focused on the wrapper for the OpenAI chat model, but other wrappers work in the same way!

Pathway has two more wrappers for chat models - LiteLLMChat and HFPipelineChat. For example, to use Gemini with LiteLLM, create an instance of LiteLLMChat and then apply it to the column with messages to be sent over API.

model = LiteLLMChat(
    model="gemini/gemini-pro", # Choose the model you want
    api_key=os.environ["GEMINI_API_KEY"], # Read GEMINI API key from environmental variables
)
# Ask Gemini questions from `prompt` column 
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

With the wrapper for LiteLLM, Pathway allows you to use many popular LLMs. For models from HuggingFace that you want to run locally, Pathway gives a separate wrapper (for calling HuggingFace through API, use LiteLLM wrapper), called HFPipelineChat. When an instance of this wrapper is created, it initializes a HuggingFace pipeline, so any arguments to the pipeline - including the name of the model - must be set during the initialization of HFPipelineChat. Any parameters to pipeline.__call__ can be as before set during initialization or overridden during application.

model = HFPipelineChat(
    model="gpt2", # Choose the model you want
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

You can check an example with HuggingFace running locally in the llm-app repository.

Pathway also comes with wrappers for embedding models - OpenAIEmbedder, LiteLLMEmbedder and SentenceTransformersEmbedder. Each of them can be applied to a column of strings and returns a column with a list of floats - the embeddings.

embedder = OpenAIEmbedder(
    model="text-embedding-ada-002", # model for embedding
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# calculate embedding for column `text` in table `documents`
responses = documents.select(result=embedder(pw.this.text))

Asynchrony

Wrapper for OpenAI and LiteLLM, both for chat and embedding, are asynchronous, and Pathway allows you to set three parameters to set their behavior. These are:

  • capacity, which sets the number of concurrent operations allowed,
  • retry_strategy, which sets the strategy for handling retries in case of failures,
  • cache_strategy, which defines the cache mechanism.

These three parameters need to be set during the initialization of the wrapper.

model = OpenAIChat(
    capacity=5, # maximum concurrent operations is 5
    # in case of failure, retry 5 times, each time waiting twice as long before retrying
    retry_strategy=pw.asynchronous.ExponentialBackoffRetryStrategy(max_retries=5, initial_delay=1000, backoff_factor=2),
    # if PATHWAY_PERSISTENT_STORAGE is set, then it is used to cache the calls
    cache_strategy=pw.asynchronous.DefaultCache(),
    model="gpt-3.5-turbo",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

Creating a Pathway LLM pipeline

You can now combine these wrappers to create a LLM pipeline using Pathway. To learn how to do this, read our tutorial.

Preparing documents for LLMs

The Pathway xpack for LLMs provides tools for preparing your documents and texts in order to use them with LLMs. You can use ParseUnstructured for parsing your documents into texts and TokenCountSplitter for dividing texts into smaller chunks.

Parsing documents

Use the ParseUnstructured class to parse documents in Pathway. Underneath, it uses the Unstructured library to parse your documents. To use it, you need to read the contents of a file into a Pathway Table using any connector of your choice. Then, apply an instance of the ParseUnstructured class to get a Pathway Table with parsed content of documents. ParseUnstructured has an argument mode which takes one of three values: single, paged or elements. If set to single, the whole document is returned as one string, if set to paged then there is a string for each page in the document, and if set to elements then Unstructured's division into elements is preserved. The mode argument can be set either during initialization or application of ParseUnstructured.

import os
import pathway as pw
from pathway.xpacks.llm.parsers import ParseUnstructured

files = pw.io.fs.read(
    os.environ.get("DATA_DIR"),
    mode="streaming",
    format="binary",
    autocommit_duration_ms=50,
)
parser = ParseUnstructured(mode="elements")
documents = files.select(elements=parser(pw.this.data))

ParseUnstructured for a document returns a list of tuples with parsed text and associated metadata returned from Unstructured. If you want to have each string with text in another row of the table, you should use the flatten function.

documents = documents.flatten(pw.this.elements) # flatten list into multiple rows
documents = documents.select(text=pw.this.elements[0], metadata=pw.this.elements[1]) # extract text and metadata from tuple

Splitting texts

Once you have some texts in a Pathway Table, you can use the TokenCountSplitter class to divide them into smaller chunks. It tries to split the text in such a way that each part has between min_token and max_token tokens, but so that sentences are not cut in half.

TokenCountSplitter has three parameters - min_token, max_token and encoding - and each of them can be overridden during the call of the function. min_token and max_token, as mentioned above, set the minimum and maximum length of each chunk, whereas encoding is the name of the tiktoken encoding to be used.

from pathway.xpacks.llm.splitters import TokenCountSplitter

splitter = TokenCountSplitter(min_tokens=100, max_tokens=300, encoding)
texts = documents.select(chunk=splitter(pw.this.text))

TokenCountSplitter returns data in the same format as ParseUnstructured - that is for each row it returns a list of tuples, where each tuple consists of a string with the text of a chunk and a dictionary with associated metadata.

With these tools it is easy to create in Pathway a pipeline serving as a Vector Store, but which updates on each data change. You can check such an example in the llm-app repository. As it is a common pipeline, Pathway provides a class VectorStore which implements this pipeline.

Ready-to-use Vector Store

Pathway Vector Store enables building a document index on top of your documents and allows for easy-to-manage, always up-to-date LLM pipelines accessible using a RESTful API. It maintains an index of your documents and allows for querying for documents closest to a given query. It is implemented using two classes - VectorStoreServer and VectorStoreClient.

The VectorStoreServer class implements the pipeline for indexing your documents and runs an HTTP REST server for nearest neighbors queries. You can use VectorStoreServer by itself to use Pathway as a Vector Store, and you then query it using REST. Alternatively, use VectorStoreClient for querying VectorStoreServer which implements wrappers for REST calls.

You can learn more about Vector Store in Pathway in a dedicated tutorial.

Discuss tricks & tips for RAG

Join our Discord community and dive into discussions on tricks and tips for mastering Retrieval Augmented Generation

LLMGPTOpenAIGemini