LLM Chat
Out of the box, the LLM xpack provides wrappers for text generation and embedding LLMs. For text generation, you can use native wrappers for the OpenAI chat model and HuggingFace models running locally. Many other popular models, including Azure OpenAI, HuggingFace (when using their API) or Gemini can be used with the LiteLLM
wrapper. To check the full list of providers supported by LiteLLM check LiteLLM documentation. Currently, Pathway provides wrappers for the following LLMs:
- OpenAI
- LiteLLM
- Hugging Face Pipeline
- Cohere
To use a wrapper, first create an instance of the wrapper, which you can then apply to a column containing prompts.
As mentioned on the previous page, we create a Pathway table to be used in the examples below:
import pathway as pw
query = pw.debug.table_from_markdown('''
message
How many 'r' there are in 'strawberry'?
''')
UDFs
Each wrapper is a UDF (User Defined Function), which allows users to define their own functions to interact with Pathway objects. A UDF, in general, is any function that takes some input, processes it, and returns an output. In the context of the Pathway library, UDFs enable seamless integration of custom logic, such as invoking LLMs for specific tasks.
In particular a UDF can serve as a wrapper for LLM calls, allowing users to pass prompts or other inputs to a model and retrieve the corresponding outputs. This design makes it easy to interact with Pathway tables and columns while incorporating the power of LLMs.
OpenAIChat
For OpenAI, you create a wrapper using the OpenAIChat
class.
from pathway.xpacks.llm import llms
model = llms.OpenAIChat(
model="gpt-4o-mini",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Send queries from column `messages` in table `query` to OpenAI
responses = query.select(result=model(pw.this.messages))
Preparing queries
OpenAIChat
expects messages to be in the format required by OpenAI API - that is a list of dictionaries, where each dictionary is one message in the conversation so far. If you want to ask single questions use pw.xpacks.llm.llm.prompt_chat_single_qa
to wrap them so that they match the format expected by OpenAI API.
# Column `prompt` holds strings with questions to be sent to OpenAI chat
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))
Model Parameters
OpenAI API takes a number of parameters, including model
and api_key
used in the code stubs above. OpenAIChat
allows you to set their default value during the initialization of the class, but you can also override them during application.
model = llms.OpenAIChat(
model="gpt-4o-mini",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
max_tokens=200, # Set default value of max_tokens to be 200
)
# As max_tokens is not set, value 200 will be used
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))
# Now value of max_tokens is taken from column `max_tokens`, overriding default value set when initializing OpenAIChat
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt), max_tokens(pw.this.max_tokens)))
LiteLLM
Pathway has a wrapper for LiteLLM - LiteLLMChat
. For example, to use Gemini with LiteLLM, create an instance of LiteLLMChat
and then apply it to the column with messages to be sent over API.
from pathway.xpacks.llm import llms
llms.model = llms.LiteLLMChat(
model="gemini/gemini-pro", # Choose the model you want
api_key=os.environ["GEMINI_API_KEY"], # Read GEMINI API key from environmental variables
)
# Ask Gemini questions from `prompt` column
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))
With the wrapper for LiteLLM, Pathway allows you to use many popular LLMs.
Hugging Face Pipeline
For models from Hugging Face that you want to run locally, Pathway gives a separate wrapper called HFPipelineChat
(for calling HuggingFace through API, use LiteLLM wrapper). When an instance of this wrapper is created, it initializes a HuggingFace pipeline
, so any arguments to the pipeline
- including the name of the model - must be set during the initialization of HFPipelineChat
. Any parameters to pipeline.__call__
can be as before set during initialization or overridden during application.
from pathway.xpacks.llm import llms
model = llms.HFPipelineChat(
model="gpt2", # Choose the model you want
)
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))
Cohere
Pathway has a wrapper for the Cohere Chat Services
. The wrapper allows for augmenting the query with documents. The result contains cited documents along with the response.
from pathway.xpacks.llm import llms
model = llms.CohereChat()
docs = [{"text": "Pathway is a high-throughput, low-latency data processing framework that handles live data & streaming for you."},
{"text": "RAG stands for Retrieval Augmented Generation."}]
query_with_docs = query.select(*pw.this, docs=docs)
r = query_with_docs.select(ret=chat(llms.prompt_chat_single_qa("what is rag?"), documents=pw.this.docs))
parsed_table = r.select(response=pw.this.ret[0], citations=pw.this.ret[1])
Wrappers are asynchronous
Wrapper for OpenAI and LiteLLM, both for chat and embedding, are asynchronous, and Pathway allows you to set three parameters to set their behavior. These are:
capacity
, which sets the number of concurrent operations allowed,retry_strategy
, which sets the strategy for handling retries in case of failures,cache_strategy
, which defines the cache mechanism.
These three parameters need to be set during the initialization of the wrapper. You can read more about them in the UDFs guide.
model = OpenAIChat(
# maximum concurrent operations is 10
capacity=10,
# in case of failure, retry 5 times, each time waiting twice as long before retrying
retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy(max_retries=5, initial_delay=1000, backoff_factor=2),
# if PATHWAY_PERSISTENT_STORAGE is set, then it is used to cache the calls
cache_strategy=pw.udfs.DefaultCache(),
# select the model
model="gpt-3.5-turbo",
# read OpenAI API key from environmental variables
api_key=os.environ["OPENAI_API_KEY"],
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
Embedders
Pathway also comes with wrappers for embedding models:
For more information on the embedders, refer to the Embedding section on the Document Indexing
page