Build an AI pipeline with Pathway
In this guide, you will learn how to construct a dynamic, real-time LLM App using Pathway. Explore key features like real-time document indexing, adaptive learning from updated documentation, and managing user sessions. Dive into this exciting combination of technologies that brings a responsive, knowledge-growing application to life.
You can find ready-to-run LLM and RAG examples on our App Templates page.
Pathway makes handling realtime data easy. In this showcase we are going to demonstrate how Pathway can be used to build a chatbot answering questions about the Pathway documentation. This interactive application will exhibit dynamic adaptability to changing data sources:
- User queries, to which responses must be generated in realtime,
- Documentation entries, which should be incrementally re-indexed after each change.
The app in action can be seen in the video below:
The App first reads a corpus of documents from the connector of your choice. It preprocesses them and builds a vector index. It then listens to user queries coming as HTTP REST requests. Each query uses the index to retrieve relevant documentation snippets and uses the OpenAI API to provide a response in natural language. The bot is reactive to changes to the corpus of documents: once new snippets are provided, it reindexes them and starts to use the new knowledge to answer subsequent queries.
For the ready implementation of the app from this guide, visit our GitHub repository at llm-app.
Get started with Pathway Realtime Document AI pipelines with our step-by-step guide, from setup to live document sync. Explore built-in features like Similarity Search, Vector Index, and more!
Prerequisites
LLM dependencies are not part of the main Pathway package, so to install them run:
pip install pathway[xpack-llm]
Warmup: answering user queries without context
The simplest way to get started with a conversational AI model using Pathway is to create an application that answers user queries without any context. This application will leverage a RESTful API and apply a Large Language Model.
Key Insights from This Section
- How to use a REST connector.
- Apply an LLM or any custom model on a user query.
This example is also present in the llm-app
repository, in the examples/pipelines/contextless
directory. To implement it yourself, start with importing Pathway
library, and wrapper for OpenAI from the LLM xpack.
import os
import pathway as pw
from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa
# REST Connector config.
HTTP_HOST = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "127.0.0.1")
HTTP_PORT = os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")
# LLM model parameters
# For OPENAI API
API_KEY = os.environ["OPENAI_API_KEY"]
# Specific model from OpenAI. You can also use gpt-3.5-turbo for faster responses.
MODEL_LOCATOR = "gpt-4"
# Controls the stochasticity of the openai model output.
TEMPERATURE = 0.0
# Max completion tokens
MAX_TOKENS = 50
Firstly, define the input schema for our application. This is done using pw.Schema
, which helps to enforce the structure of the data being processed by Pathway. The schema in this example, QueryInputSchema
, expects a query
(the question or prompt from the user) and a user
(the identifier for the user). Then, establish a RESTful connection using pw.io.http.rest_connector
.
class QueryInputSchema(pw.Schema):
query: str
user: str
query, response_writer = pw.io.http.rest_connector(
host=HTTP_HOST,
port=int(HTTP_PORT),
schema=QueryInputSchema,
autocommit_duration_ms=50,
)
Here, query
will be a Pathway stream that receives input from HTTP requests. response_writer
is a function that you can use to write responses back to the HTTP client.
You can now construct the main query pipeline for your application. The model to use here is GPT4 from OpenAI.
model = OpenAIChat(
api_key=API_KEY,
model=MODEL_LOCATOR,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
retry_strategy=pw.udfs.FixedDelayRetryStrategy(),
cache_strategy=pw.udfs.DefaultCache(),
)
response = query.select(
query_id=pw.this.id, result=model(prompt_chat_single_qa(pw.this.query))
)
response_writer(response)
pw.run()
Now, run your application.
python app.py
Then, you can query it from a different terminal:
curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/
Context Enhancement for Better Responses
Despite GPT-4
's extensive training, it may not recognize certain context-specific elements, such as Pathway documentation. The solution lies in adding pertinent documents to the context. This is where the role of a vector database becomes crucial.
Essential Learnings from This Section
- Generating vector embeddings using an LLM.
- Creating a k-Nearest Neighbors (k-NN) powered Index.
- Expanding the bot's capabilities to respond to user queries.
Let's consider a scenario where documents are stored in JSON Lines files in your filesystem, though it could equally apply to any cloud storage or other connector available in Pathway. Each document is represented as a separate line within these files. The JSON Lines format is particularly advantageous for managing large data sets that cannot fit into memory all at once. Each line in a JSON Lines file contains a separate, independent JSON object. This makes the format especially suitable for handling and streaming large data, as it doesn't require loading the entire files into memory.
Start by creating a directory where you will store your documents
mkdir data
and download file with Pathway documentation.
wget 'https://public-pathway-releases.s3.eu-central-1.amazonaws.com/data/pathway-documentation.jsonl' -O 'data/pathway-documentation.jsonl' -q -nc
For each document and each query, embeddings are computed using a pre-trained language model. These embeddings are numerical representations of the documents: they are used to find the documents that are most relevant to each query. Pathway offers API integration with premier LLM service providers, including but not limited to OpenAI and HuggingFace. You can import the model interface for the provider of your choice and specify the API key and the model ID to call. By default, the embedder is text-embedding-ada-002
from OpenAI, which returns vectors of dimension 1536
. Please check out openai-model-endpoint-compatibility for more information on the available models.
To implement this, remove the LLM query at the end of the program you obtained in the last section: you first need to retrieve context before querying the LLM. You should be left with the following code:
import os
import pathway as pw
from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa
# REST Connector config.
HTTP_HOST = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "127.0.0.1")
HTTP_PORT = os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")
# LLM model parameters
# For OPENAI API
API_KEY = os.environ["OPENAI_API_KEY"]
# Specific model from OpenAI. You can also use gpt-3.5-turbo for faster responses.
MODEL_LOCATOR = "gpt-4"
# Controls the stochasticity of the openai model output.
TEMPERATURE = 0.0
# Max completion tokens
MAX_TOKENS = 50
class QueryInputSchema(pw.Schema):
query: str
user: str
query, response_writer = pw.io.http.rest_connector(
host=HTTP_HOST,
port=int(HTTP_PORT),
schema=QueryInputSchema,
autocommit_duration_ms=50,
)
model = OpenAIChat(
api_key=API_KEY,
model=MODEL_LOCATOR,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
retry_strategy=pw.udfs.FixedDelayRetryStrategy(),
cache_strategy=pw.udfs.DefaultCache(),
)
Now, add an input connector, which will read the data
directory.
class DocumentInputSchema(pw.Schema):
doc: str
documents = pw.io.fs.read(
"data/",
format="json",
schema=DocumentInputSchema,
mode="streaming"
)
It's time to use the embeddings to find closest contexts. In this example you'll use OpenAI Embedder, but you can substitute any other embedder from the LLM xpack.
from pathway.xpacks.llm.embedders import OpenAIEmbedder
EMBEDDER_LOCATOR = "text-embedding-ada-002"
EMBEDDING_DIMENSION = 1536
embedder = OpenAIEmbedder(
api_key=API_KEY,
model=EMBEDDER_LOCATOR,
retry_strategy=pw.udfs.FixedDelayRetryStrategy(),
cache_strategy=pw.udfs.DefaultCache(),
)
# Embed documents
enriched_documents = documents + documents.select(
data=embedder(pw.this.doc)
)
# Embed queries
query += query.select(
data=embedder(pw.this.query),
)
| query | data
^X1MXHYY... | How to connect to Kafka in Pathway? | [-0.00027798660448752344, 0.0035375410225242376, -0.00889557134360075...]
To achieve efficient retrieval of relevant documents, we leverage the power of KNN (K-Nearest Neighbors) indexing. By constructing an index using the generated embeddings, the KNN model allows us to quickly identify the documents that bear the most similarity to a given query. This technique is significantly faster and more efficient than conducting individual comparisons between the query and every document.
from pathway.stdlib.ml.index import KNNIndex
index = KNNIndex(
enriched_documents.data, enriched_documents, n_dimensions=EMBEDDING_DIMENSION
)
query_context = query + index.get_nearest_items(
query.data, k=3, collapse_rows=True
).select(documents_list=pw.this.doc)
| query | documents_list
^X1MXHYY... | How to connect to Kafka in Pathway? | ('The documentation describes a ...', 'The pw.io.debezium.read() func...', 'This documentation lists the a...')
By implementing the build_prompt
function, we consolidate the query and associated documents into one coherent string, allowing the model to use the given documents for contextual understanding when generating its response. This procedure also provides an opportunity to include specific directives and guidelines for the Large Language Model (LLM) to adhere to.
@pw.udf
def build_prompt(documents, query) -> str:
docs_str = "\n".join(documents)
prompt = (
f"Given the following documents : \n {docs_str} \nanswer this query: {query}"
)
return prompt
prompt = query_context.select(
prompt=build_prompt(pw.this.documents_list, pw.this.query)
)
| prompt
^X1MXHYY... | Given the following documents...
Ultimately, we invoke the GPT-4
model with these thoughtfully crafted prompts and observe the sophistication of its generated responses.
response = prompt.select(
query_id=pw.this.id,
result=model(
prompt_chat_single_qa(pw.this.prompt),
),
)
response_writer(response)
pw.run()
python app.py
Then, in another terminal, run:
curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/
Real-time Adaptability: Automatic Updates with Pathway
A remarkable feature of Pathway is its automatic adaptability to changes. This feature makes Pathway an effective and efficient tool for real-time document indexing and query answering.
Once you have preprocessed your corpus and created the index, Pathway automatically detects any changes in the document directory and updates the vector index accordingly. This real-time reactivity ensures that app's responses are always based on the most recent and relevant information available.
Let's put this feature to the test. Consider a scenario where you initially query the system with "How to run large language models with Pathway?". Since the bot doesn't have any context about LLMs in Pathway, it wouldn't provide a satisfactory response at this point.
curl --data '{"user": "user", "query": "How to use LLMs in Pathway?"}' http://localhost:8080/
Next, add some additional documents which provide context about Pathway to your data
folder.
wget 'https://public-pathway-releases.s3.eu-central-1.amazonaws.com/data/pathway-documentation-extra.jsonl' -O 'data/pathway-documentation-extra.jsonl' -q -nc
Now, when you query the system with the same question again, Pathway automatically detects the newly added documents, updates the vector index, and the bot can provide a more appropriate response.
curl --data '{"user": "user", "query": "How to use LLMs in Pathway?"}' http://localhost:8080/
This real-time adaptability of Pathway is truly a game-changer when it comes to keeping your AI models updated with the latest data.
At this point, you should have a complete pipeline that not only sifts continuously through your document database to find the most relevant documents for a given query but also calls upon a Generative AI model to generate a detailed and coherent response based on these relevant documents.
The power of Pathway lies in its flexibility and robustness - you can tweak this pipeline to suit a variety of other applications, from customer support to medical literature review. The possibilities are truly endless.
Join our Discord community and dive into discussions on tricks and tips for mastering Retrieval Augmented Generation