Introduction to the LLM xpack
The LLM xpack provides you all the tools you need to use Large Language Models in Pathway. Wrappers for most common LLM services and utilities are included, making working with LLMs as easy as it can be.
You can find ready-to-run LLM and RAG examples on our App Templates page.
Wrappers for LLMs
Out of the box, the LLM xpack provides wrappers for text generation and embedding LLMs. For text generation, you can use native wrappers for the OpenAI chat model and HuggingFace models running locally. Many other popular models, including Azure OpenAI, HuggingFace (when using their API) or Gemini can be used with the wrapper for LiteLLM. To check the full list of providers supported by LiteLLM check LiteLLM documentation.
Each wrapper is a UDFClass. To use it, first create an instance of the wrapper, which you can then apply to a column with prompts. For OpenAI, you create a wrapper with OpenAIChat
class.
model = OpenAIChat(
model="gpt-3.5-turbo",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Send queries from column `messages` in table `query` to OpenAI
responses = query.select(result=model(pw.this.messages))
Preparing queries
OpenAIChat
expects messages to be in the format required by OpenAI API - that is a list of dictionaries, where each dictionary is one message in the conversation so far. If you want to ask single questions use pw.xpacks.llm.llm.prompt_chat_single_qa
to wrap them.
from pathway.xpack.llm.llms import prompt_chat_single_qa
model = OpenAIChat(
model="gpt-3.5-turbo",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Column `prompt` holds strings with questions to be sent to OpenAI chat
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
Model Parameters
OpenAI API takes a number of parameters, including model
and api_key
used in the code stubs above. OpenAIChat
allows you to set their default value during the initialization of the class, but you can also override them during application.
model = OpenAIChat(
model="gpt-3.5-turbo",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
max_tokens=200, # Set default value of max_tokens to be 200
)
# As max_tokens is not set, value 200 will be used
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
# Now value of max_tokens is taken from column `max_tokens`, overriding default value set when initializing OpenAIChat
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt), max_tokens(pw.this.max_tokens)))
What about other models?
So far we focused on the wrapper for the OpenAI chat model, but other wrappers work in the same way!
Pathway has two more wrappers for chat models - LiteLLMChat
and HFPipelineChat
. For example, to use Gemini with LiteLLM, create an instance of LiteLLMChat
and then apply it to the column with messages to be sent over API.
model = LiteLLMChat(
model="gemini/gemini-pro", # Choose the model you want
api_key=os.environ["GEMINI_API_KEY"], # Read GEMINI API key from environmental variables
)
# Ask Gemini questions from `prompt` column
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
With the wrapper for LiteLLM, Pathway allows you to use many popular LLMs. For models from HuggingFace that you want to run locally, Pathway gives a separate wrapper (for calling HuggingFace through API, use LiteLLM wrapper), called HFPipelineChat
. When an instance of this wrapper is created, it initializes a HuggingFace pipeline
, so any arguments to the pipeline
- including the name of the model - must be set during the initialization of HFPipelineChat
. Any parameters to pipeline.__call__
can be as before set during initialization or overridden during application.
model = HFPipelineChat(
model="gpt2", # Choose the model you want
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
Pathway also comes with wrappers for embedding models - OpenAIEmbedder
, LiteLLMEmbedder
and SentenceTransformersEmbedder
. Each of them can be applied to a column of strings and returns a column with numpy arrays - the embeddings.
embedder = OpenAIEmbedder(
model="text-embedding-ada-002", # model for embedding
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# calculate embedding for column `text` in table `documents`
responses = documents.select(result=embedder(pw.this.text))
Asynchrony
Wrapper for OpenAI and LiteLLM, both for chat and embedding, are asynchronous, and Pathway allows you to set three parameters to set their behavior. These are:
capacity
, which sets the number of concurrent operations allowed,retry_strategy
, which sets the strategy for handling retries in case of failures,cache_strategy
, which defines the cache mechanism.
These three parameters need to be set during the initialization of the wrapper. You can read more about them in the UDFs guide.
model = OpenAIChat(
capacity=5, # maximum concurrent operations is 5
# in case of failure, retry 5 times, each time waiting twice as long before retrying
retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy(max_retries=5, initial_delay=1000, backoff_factor=2),
# if PATHWAY_PERSISTENT_STORAGE is set, then it is used to cache the calls
cache_strategy=pw.udfs.DefaultCache(),
model="gpt-3.5-turbo",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
Creating a Pathway LLM pipeline
You can now combine these wrappers to create an LLM pipeline using Pathway. To learn how to do this, read our tutorial.
Preparing documents for LLMs
The Pathway xpack for LLMs provides tools for preparing your documents and texts in order to use them with LLMs. You can use ParseUnstructured
for parsing your documents into texts and TokenCountSplitter
for dividing texts into smaller chunks.
Parsing documents
Use the ParseUnstructured
class to parse documents in Pathway. Underneath, it uses the Unstructured library to parse your documents. To use it, you need to read the contents of a file into a Pathway Table using any connector of your choice. Then, apply an instance of the ParseUnstructured
class to get a Pathway Table with parsed content of documents. ParseUnstructured
has an argument mode
which takes one of three values: single
, paged
or elements
. If set to single
, the whole document is returned as one string, if set to paged
then there is a string for each page in the document, and if set to elements
then Unstructured's division into elements is preserved. The mode
argument can be set either during initialization or application of ParseUnstructured
.
import os
import pathway as pw
from pathway.xpacks.llm.parsers import ParseUnstructured
files = pw.io.fs.read(
os.environ.get("DATA_DIR"),
mode="streaming",
format="binary",
autocommit_duration_ms=50,
)
parser = ParseUnstructured(mode="elements")
documents = files.select(elements=parser(pw.this.data))
ParseUnstructured
for a document returns a list of tuples with parsed text and associated metadata returned from Unstructured. If you want to have each string with text in another row of the table, you should use the flatten
function.
documents = documents.flatten(pw.this.elements) # flatten list into multiple rows
documents = documents.select(text=pw.this.elements[0], metadata=pw.this.elements[1]) # extract text and metadata from tuple
Splitting texts
Once you have some texts in a Pathway Table, you can use the TokenCountSplitter
class to divide them into smaller chunks. It tries to split the text in such a way that each part has between min_token
and max_token
tokens, but so that sentences are not cut in half.
TokenCountSplitter
has three parameters - min_token
, max_token
and encoding
- and each of them can be overridden during the call of the function. min_token
and max_token
, as mentioned above, set the minimum and maximum length of each chunk, whereas encoding
is the name of the tiktoken encoding to be used.
from pathway.xpacks.llm.splitters import TokenCountSplitter
splitter = TokenCountSplitter(min_tokens=100, max_tokens=300, encoding)
texts = documents.select(chunk=splitter(pw.this.text))
TokenCountSplitter
returns data in the same format as ParseUnstructured
- that is for each row it returns a list of tuples, where each tuple consists of a string with the text of a chunk and a dictionary with associated metadata.
With these tools it is easy to create in Pathway a pipeline serving as a Vector Store, but which updates on each data change. You can check such an example in the llm-app repository. As it is a common pipeline, Pathway provides a class VectorStore
which implements this pipeline.
Ready-to-use Vector Store
Pathway Vector Store enables building a document index on top of your documents and allows for easy-to-manage, always up-to-date LLM pipelines accessible using a RESTful API. It maintains an index of your documents and allows for querying for documents closest to a given query. It is implemented using two classes - VectorStoreServer
and VectorStoreClient
.
The VectorStoreServer
class implements the pipeline for indexing your documents and runs an HTTP REST server for nearest neighbors queries. You can use VectorStoreServer
by itself to use Pathway as a Vector Store, and you then query it using REST. Alternatively, use VectorStoreClient
for querying VectorStoreServer
which implements wrappers for REST calls.
You can learn more about Vector Store in Pathway in a dedicated tutorial.
Integrating with LlamaIndex and LangChain
Vector Store offer integrations with both LlamaIndex and LangChain. These allow you to incorporate Vector Store Client in your LlamaIndex and LangChain pipelines or use LlamaIndex and LangChain components in the Vector Store. Read more about the integrations in the article on LlamaIndex and on LangChain.
Rerankers
Rerankers allow you to evaluate whether a document is relevant to the question asked. A typical application of rerankers is to implement a two-stage retrieval. First, some number of documents is retrieved from a vector store, purposely more than you wish to embed in the query as a context. Then all these documents are ranked with a reranker, and then only the best of them are left, while the rest are discarded.
Pathway offers three rerankers:
LLMReranker
asks an LLN chat of your choice to rank the relevance of a document against a query on a scale from 1 to 5,CrossEncoderReranker
is a wrapper on CrossEncoder from thesentence_transformers
,EncoderReranker
uses embeddings from the sentence_transformers library to calculate the relevance of a document against a query.
Additionally, once you rank the documents, you can use rerank_topk_filter
to choose k
best documents.
Modular RAGs
To combine all the pieces into a RAG, you can use one of modular RAGs available in the LLM xpack. BaseRAGQuestionAnswerer
is a standard RAG, that given a query obtains k
best documents from the vector store, and sends them along the question to the LLM chat. AdaptiveRAGQuestionAnswerer
tries to limit the number of documents sent to the LLM chat to save tokens - to do that it initially sends only a small number of documents to the chat, which is increased until an answer is found. To read more, why that can save tokens without sacrificing accuracy, check our showcase on Adaptive RAG.
Both these RAGs are customizable with an LLM model used to answer questions, a vector store for retrieving documents and templates for embedding context chunks in the question.
Join our Discord community and dive into discussions on tricks and tips for mastering Retrieval Augmented Generation