LLM Chat

Out of the box, the LLM xpack provides wrappers for text generation and embedding LLMs. For text generation, you can use native wrappers for the OpenAI chat model and HuggingFace models running locally. Many other popular models, including Azure OpenAI, HuggingFace (when using their API) or Gemini can be used with the LiteLLM wrapper. To check the full list of providers supported by LiteLLM check LiteLLM documentation. Currently, Pathway provides wrappers for the following LLMs:

  • OpenAI
  • LiteLLM
  • Hugging Face Pipeline
  • Cohere

To use a wrapper, first create an instance of the wrapper, which you can then apply to a column containing prompts.

As mentioned on the previous page, we create a Pathway table to be used in the examples below:

import pathway as pw
query = pw.debug.table_from_markdown('''
message
How many 'r' there are in 'strawberry'?
''')

UDFs

Each wrapper is a UDF (User Defined Function), which allows users to define their own functions to interact with Pathway objects. A UDF, in general, is any function that takes some input, processes it, and returns an output. In the context of the Pathway library, UDFs enable seamless integration of custom logic, such as invoking LLMs for specific tasks.

In particular a UDF can serve as a wrapper for LLM calls, allowing users to pass prompts or other inputs to a model and retrieve the corresponding outputs. This design makes it easy to interact with Pathway tables and columns while incorporating the power of LLMs.

OpenAIChat

For OpenAI, you create a wrapper using the OpenAIChat class.

from pathway.xpacks.llm import llms

model = llms.OpenAIChat(
    model="gpt-4o-mini",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Send queries from column `messages` in table `query` to OpenAI
responses = query.select(result=model(pw.this.messages))

Preparing queries

OpenAIChat expects messages to be in the format required by OpenAI API - that is a list of dictionaries, where each dictionary is one message in the conversation so far. If you want to ask single questions use pw.xpacks.llm.llm.prompt_chat_single_qa to wrap them so that they match the format expected by OpenAI API.

# Column `prompt` holds strings with questions to be sent to OpenAI chat 
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))

Model Parameters

OpenAI API takes a number of parameters, including model and api_key used in the code stubs above. OpenAIChat allows you to set their default value during the initialization of the class, but you can also override them during application.

model = llms.OpenAIChat(
    model="gpt-4o-mini",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
    max_tokens=200, # Set default value of max_tokens to be 200
)
# As max_tokens is not set, value 200 will be used
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))
# Now value of max_tokens is taken from column `max_tokens`, overriding default value set when initializing OpenAIChat
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt), max_tokens(pw.this.max_tokens)))

LiteLLM

Pathway has a wrapper for LiteLLM - LiteLLMChat. For example, to use Gemini with LiteLLM, create an instance of LiteLLMChat and then apply it to the column with messages to be sent over API.

from pathway.xpacks.llm import llms

llms.model = llms.LiteLLMChat(
    model="gemini/gemini-pro", # Choose the model you want
    api_key=os.environ["GEMINI_API_KEY"], # Read GEMINI API key from environmental variables
)
# Ask Gemini questions from `prompt` column 
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))

With the wrapper for LiteLLM, Pathway allows you to use many popular LLMs.

Hugging Face Pipeline

For models from Hugging Face that you want to run locally, Pathway gives a separate wrapper called HFPipelineChat (for calling HuggingFace through API, use LiteLLM wrapper). When an instance of this wrapper is created, it initializes a HuggingFace pipeline, so any arguments to the pipeline - including the name of the model - must be set during the initialization of HFPipelineChat. Any parameters to pipeline.__call__ can be as before set during initialization or overridden during application.

from pathway.xpacks.llm import llms

model = llms.HFPipelineChat(
    model="gpt2", # Choose the model you want
)
responses = query.select(result=model(llms.prompt_chat_single_qa(pw.this.prompt)))

Cohere

Pathway has a wrapper for the Cohere Chat Services. The wrapper allows for augmenting the query with documents. The result contains cited documents along with the response.

from pathway.xpacks.llm import llms

model = llms.CohereChat()
docs = [{"text": "Pathway is a high-throughput, low-latency data processing framework that handles live data & streaming for you."},
{"text": "RAG stands for Retrieval Augmented Generation."}]

query_with_docs = query.select(*pw.this, docs=docs)
r = query_with_docs.select(ret=chat(llms.prompt_chat_single_qa("what is rag?"), documents=pw.this.docs))
parsed_table = r.select(response=pw.this.ret[0], citations=pw.this.ret[1])

Wrappers are asynchronous

Wrapper for OpenAI and LiteLLM, both for chat and embedding, are asynchronous, and Pathway allows you to set three parameters to set their behavior. These are:

  • capacity, which sets the number of concurrent operations allowed,
  • retry_strategy, which sets the strategy for handling retries in case of failures,
  • cache_strategy, which defines the cache mechanism.

These three parameters need to be set during the initialization of the wrapper. You can read more about them in the UDFs guide.

model = OpenAIChat(
    # maximum concurrent operations is 10
    capacity=10,
    # in case of failure, retry 5 times, each time waiting twice as long before retrying
    retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy(max_retries=5, initial_delay=1000, backoff_factor=2),
    # if PATHWAY_PERSISTENT_STORAGE is set, then it is used to cache the calls
    cache_strategy=pw.udfs.DefaultCache(),
    # select the model
    model="gpt-3.5-turbo",
    # read OpenAI API key from environmental variables
    api_key=os.environ["OPENAI_API_KEY"],
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

Embedders

Pathway also comes with wrappers for embedding models:

For more information on the embedders, refer to the Embedding section on the Document Indexing page