pw.xpacks.llm.question_answering
class pw.xpacks.llm.question_answering.AdaptiveRAGQuestionAnswerer(llm, indexer, *, default_llm_name=None, summarize_template=prompts.prompt_summarize, n_starting_documents=2, factor=2, max_iterations=4, strict_prompt=False)
[source]Builds the logic and the API for adaptive RAG application.
It allows to build a RAG app with Pathway vector store and Pathway components. Gives the freedom to choose between two question answering strategies, short (concise), and long (detailed) response, that can be set during the post request. Allows for LLM agnosticity with freedom to choose from proprietary or open-source LLMs.
It differs from BaseRAGQuestionAnswerer
in adaptive choosing the number of chunks used as a context of a question.
First, only n_starting_documents
chunks are used,
and then the number is increased until an answer is found.
- Parameters
- llm (
BaseChat
) – LLM instance for question answering. See https://pathway.com/developers/api-docs/pathway-xpacks-llm/llms for available models. - indexer (
VectorStoreServer
|DocumentStore
) – Indexing object for search & retrieval to be used for context augmentation. - default_llm_name (
str
|None
) – Default LLM model to be used in queries, only used ifmodel
parameter in post request is not specified. Omitting or setting this toNone
will default to the model name set during LLM’s initialization. - summarize_template (
UDF
) – Template for text summarization. Defaults topathway.xpacks.llm.prompts.prompt_summarize
. - n_starting_documents (
int
) – Number of documents embedded in the first query. - factor (
int
) – Factor by which a number of documents increases in each next query, if an answer is not found. - max_iterations (
int
) – Number of times to ask a question, with the increasing number of documents. - strict_prompt (
bool
) – If LLM should be instructed strictly to return json. Increases performance in small open source models, not needed in OpenAI GPT models.
- llm (
Example:
import pathway as pw
from pathway.xpacks.llm import embedders, splitters, llms, parsers
from pathway.xpacks.llm.vector_store import VectorStoreServer
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm.question_answering import AdaptiveRAGQuestionAnswerer
my_folder = pw.io.fs.read(
path="/PATH/TO/MY/DATA/*", # replace with your folder
format="binary",
with_metadata=True)
sources = [my_folder]
app_host = "0.0.0.0"
app_port = 8000
parser = parsers.ParseUnstructured()
text_splitter = splitters.TokenCountSplitter(max_tokens=400)
embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache())
vector_server = VectorStoreServer(
*sources,
embedder=embedder,
splitter=text_splitter,
parser=parser,
)
chat = llms.OpenAIChat(
model=DEFAULT_GPT_MODEL,
retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
cache_strategy=DiskCache(),
temperature=0.05,
)
app = AdaptiveRAGQuestionAnswerer(
llm=chat,
indexer=vector_server,
)
app.build_server(host=app_host, port=app_port)
app.run_server()
answer_query(pw_ai_queries)
sourceCreate RAG response with adaptive retrieval.
build_server(host, port, **rest_kwargs)
sourceAdds HTTP connectors to input tables, connects them with table transformers.
list_documents(list_documents_queries)
sourceGet list of documents from the retriever.
retrieve(retrieve_queries)
sourceRetrieve documents from the index.
serve_callable(route, schema=None, retry_strategy=None, cache_strategy=None, **additional_endpoint_kwargs)
sourceServe additional endpoints by wrapping callables. Expects an endpoint route. Schema is optional, adding schema type will enforce the
webserver to check arguments.
Beware that if Schema is not set, incorrect types may cause runtime error.
Example:
@rag_app.serve_callable(route="/agent")
async def some_func(user_query: str) -> str:
# define your agent, or custom RAG using any framework or plain Python
# ...
messages = [{"role": "user", "content": user_query}]
result = agent.invoke(messages)
return result
statistics(statistics_queries)
sourceGet statistics about indexed files.
summarize_query(summarize_queries)
sourceFunction for summarizing given texts.
class pw.xpacks.llm.question_answering.BaseContextProcessor
[source]Base class for formatting documents to LLM context.
Abstract method docs_to_context
defines the behavior for converting documents to context.
class pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer(llm, indexer, *, default_llm_name=None, prompt_template=prompts.prompt_qa, context_processor=SimpleContextProcessor(), summarize_template=prompts.prompt_summarize, search_topk=6)
[source]Builds the logic and the API for basic RAG application.
Base class to build RAG app with Pathway vector store and Pathway components. Allows for LLM agnosticity with freedom to choose from proprietary or open-source LLMs.
- Parameters
- llm (
BaseChat
) – LLM instance for question answering. See https://pathway.com/developers/api-docs/pathway-xpacks-llm/llms for available models. - indexer (
VectorStoreServer
|DocumentStore
) – Indexing object for search & retrieval to be used for context augmentation. - default_llm_name (
str
|None
) – Default LLM model to be used in queries, only used ifmodel
parameter in post request is not specified. Omitting or setting this toNone
will default to the model name set during LLM’s initialization. - prompt_template (
Union
[str
,Callable
[[str
,str
],str
],UDF
]) – Template for document question answering with short response. Either string template, callable or a pw.udf function is expected. Defaults topathway.xpacks.llm.prompts.prompt_qa
. String template needs to havecontext
andquery
placeholders in curly brackets{}
. - context_processor (
Union
[BaseContextProcessor
,Callable
[[list
[dict
] |list
[dict
[str
,str
|dict
]]],str
],UDF
]) – Utility for representing the fetched documents to the LLM. Callable, UDF orBaseContextProcessor
is expected. Defaults toSimpleContextProcessor
that keeps the ‘path’ metadata and joins the documents with double new lines. - summarize_template (
UDF
) – Template for text summarization. Defaults topathway.xpacks.llm.prompts.prompt_summarize
. - search_topk (
int
) – Top k parameter for the retrieval. Adjusts number of chunks in the context.
- llm (
Example:
import pathway as pw
from pathway.xpacks.llm import embedders, splitters, llms, parsers
from pathway.xpacks.llm.vector_store import VectorStoreServer
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer
from pathway.xpacks.llm.servers import QASummaryRestServer
my_folder = pw.io.fs.read(
path="/PATH/TO/MY/DATA/*", # replace with your folder
format="binary",
with_metadata=True)
sources = [my_folder]
app_host = "0.0.0.0"
app_port = 8000
parser = parsers.ParseUnstructured()
text_splitter = splitters.TokenCountSplitter(max_tokens=400)
embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache())
vector_server = VectorStoreServer(
*sources,
embedder=embedder,
splitter=text_splitter,
parser=parser,
)
chat = llms.OpenAIChat(
model=DEFAULT_GPT_MODEL,
retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
cache_strategy=DiskCache(),
temperature=0.05,
)
prompt_template = "Answer the question. Context: {context}. Question: {query}"
rag = BaseRAGQuestionAnswerer(
llm=chat,
indexer=vector_server,
prompt_template=prompt_template,
)
app = QASummaryRestServer(app_host, app_port, rag)
app.run_server()
answer_query(pw_ai_queries)
sourceMain function for RAG applications that answer questions based on available information.
build_server(host, port, **rest_kwargs)
sourceAdds HTTP connectors to input tables, connects them with table transformers.
list_documents(list_documents_queries)
sourceGet list of documents from the retriever.
retrieve(retrieve_queries)
sourceRetrieve documents from the index.
serve_callable(route, schema=None, retry_strategy=None, cache_strategy=None, **additional_endpoint_kwargs)
sourceServe additional endpoints by wrapping callables. Expects an endpoint route. Schema is optional, adding schema type will enforce the
webserver to check arguments.
Beware that if Schema is not set, incorrect types may cause runtime error.
Example:
@rag_app.serve_callable(route="/agent")
async def some_func(user_query: str) -> str:
# define your agent, or custom RAG using any framework or plain Python
# ...
messages = [{"role": "user", "content": user_query}]
result = agent.invoke(messages)
return result
statistics(statistics_queries)
sourceGet statistics about indexed files.
summarize_query(summarize_queries)
sourceFunction for summarizing given texts.
class pw.xpacks.llm.question_answering.DeckRetriever(indexer, *, search_topk=6)
[source]Builds the logic for the Retriever of slides.
- Parameters
- indexer (
SlidesDocumentStore
|SlidesVectorStoreServer
) – document store for parsing and indexing slides. - search_topk (
int
) – Number of slides to be returned by the answer_query method.
- indexer (
answer_query(pw_ai_queries)
sourceReturn slides similar to the given query.
class pw.xpacks.llm.question_answering.RAGClient(host=None, port=None, url=None, timeout=90, additional_headers=None)
[source]Connector for interacting with the Pathway RAG applications. Either (host and port) or url must be set.
- Parameters
- host (
str
|None
) – The host of the RAG service. - port (
int
|None
) – The port of the RAG service. - url (
str
|None
) – The URL of the RAG service. - timeout (
int
|None
) – Timeout for requests in seconds. Defaults to 90. - additional_headers (
dict
|None
) – Additional headers for the requests.
- host (
pw_ai_answer(prompt, filters=None, model=None)
sourceReturn RAG answer based on a given prompt and optional filter.
- Parameters
- prompt (
str
) – Question to be asked. - filters (
str
|None
) – Optional metadata filter for the documents. Defaults toNone
, which means there will be no filter. - model (
str
|None
) – Optional LLM model. IfNone
, app default will be used by the server.
- prompt (
pw_ai_summary(text_list, model=None)
sourceSummarize a list of texts.
- Parameters
- text_list (
list
[str
]) – List of texts to summarize. - model (
str
|None
) – Optional LLM model. IfNone
, app default will be used by the server.
- text_list (
pw_list_documents(filters=None, keys=['path'])
sourceList indexed documents from the vector store with optional filtering.
- Parameters
- filters (
str
|None
) – Optional metadata filter for the documents. - keys (
list
[str
]) – List of metadata keys to be included in the response. Defaults to\["path"\]
. Setting toNone
will retrieve all available metadata.
- filters (
retrieve(query, k=3, metadata_filter=None, filepath_globpattern=None)
sourceRetrieve closest documents from the vector store based on a query.
- Parameters
- query (
str
) – The query string. - k (
int
) – The number of results to retrieve. - metadata_filter (
str
|None
) – Optional metadata filter for the documents. Defaults to None, which means there will be no filter. - filepath_globpattern (
str
|None
) – Glob pattern for file paths.
- query (
statistics()
sourceRetrieve stats from the vector store.
class pw.xpacks.llm.question_answering.SimpleContextProcessor(context_metadata_keys=<factory>, context_joiner='\\n\\n')
[source]Context processor that filters metadata fields and joins the documents.
pw.xpacks.llm.question_answering.answer_with_geometric_rag_strategy(questions, documents, llm_chat_model, n_starting_documents, factor, max_iterations, strict_prompt=False)
sourceFunction for querying LLM chat while providing increasing number of documents until an answer is found. Documents are taken from documents argument. Initially first n_starting_documents documents are embedded in the query. If the LLM chat fails to find an answer, the number of documents is multiplied by factor and the question is asked again.
- Parameters
- questions (
ColumnReference
[str]
) – Column with questions to be asked to the LLM chat. - documents (
ColumnReference
[list[str]]
) – Column with documents to be provided along with a question to the LLM chat. - llm_chat_model (
BaseChat
) – Chat model which will be queried for answers - n_starting_documents (
int
) – Number of documents embedded in the first query. - factor (
int
) – Factor by which a number of documents increases in each next query, if an answer is not found. - max_iterations (
int
) – Number of times to ask a question, with the increasing number of documents. - strict_prompt (
bool
) – If LLM should be instructed strictly to return json. Increases performance in small open source models, not needed in OpenAI GPT models.
- questions (
- Returns
A column with answers to the question. If answer is not found, then None is returned.
Example:
import pandas as pd
import pathway as pw
from pathway.xpacks.llm.llms import OpenAIChat
from pathway.xpacks.llm.question_answering import answer_with_geometric_rag_strategy
chat = OpenAIChat()
df = pd.DataFrame(
{
"question": ["How do you connect to Kafka from Pathway?"],
"documents": [
[
"`pw.io.csv.read reads a table from one or several files with delimiter-separated values.",
"`pw.io.kafka.read` is a seneralized method to read the data from the given topic in Kafka.",
]
],
}
)
t = pw.debug.table_from_pandas(df)
answers = answer_with_geometric_rag_strategy(t.question, t.documents, chat, 1, 2, 2)
pw.xpacks.llm.question_answering.answer_with_geometric_rag_strategy_from_index(questions, index, documents_column, llm_chat_model, n_starting_documents, factor, max_iterations, metadata_filter=None, strict_prompt=False)
sourceFunction for querying LLM chat while providing increasing number of documents until an answer is found. Documents are taken from index. Initially first n_starting_documents documents are embedded in the query. If the LLM chat fails to find an answer, the number of documents is multiplied by factor and the question is asked again.
- Parameters
- questions (
ColumnReference
[str]
) – Column with questions to be asked to the LLM chat. - index (
DataIndex
) – Index from which closest documents are obtained. - documents_column (
str
|ColumnReference
) – name of the column in table passed to index, which contains documents. - llm_chat_model (
BaseChat
) – Chat model which will be queried for answers - n_starting_documents (
int
) – Number of documents embedded in the first query. - factor (
int
) – Factor by which a number of documents increases in each next query, if an answer is not found. - max_iterations (
int
) – Number of times to ask a question, with the increasing number of documents. - strict_prompt (
bool
) – If LLM should be instructed strictly to return json. Increases performance in small open source models, not needed in OpenAI GPT models.
- questions (
- Returns
A column with answers to the question. If answer is not found, then None is returned.