Langchain and Pathway: RAG Apps with always-up-to-date knowledge
You can now use Pathway in your RAG applications which enables always up-to-date knowledge from your documents to LLMs with Langchaing integration.
Pathway is now available on LangChain, a framework for developing applications powered by large language models (LLMs). You can now query Pathway and access up-to-date documents for your RAG applications from LangChain using PathwayVectorClient.
With this new integration, you will be able to use Pathway Vector Store natively in LangChain. In this guide, you will have a quick dive into Pathway + LangChain to learn how to create a real-time RAG solution.
Prerequisites
To work with LangChain you need to install langchain
package, as it is not a dependence of Pathway. In the example in this guide you will also use OpenAIEmbeddings
class for which you need langchain_openai
package.
!pip install "pathway[all]"
!pip install -U langchain-community langchainhub langchain-openai
!pip install langchain
Using LangChain components in Pathway Vector Store
When using Pathway VectorStoreServer
, you can use LangChain embedder and splitter for processing documents. To do that, use from_langchain_components
class method.
To start, you need to create a folder Pathway will listen to. Feel free to skip this if you already have a folder on which you want to build your RAG application. You can also use Google Drive, Sharepoint, or any other source from pathway-io.
!mkdir -p 'data/'
We’ve included a sample PDF file: IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content License Agreement.pdf
It’s a content license agreement, which you’ll use as the knowledge base for demonstration.
!wget -q -P ./data/ https://github.com/pathwaycom/llm-app/raw/main/examples/pipelines/demo-question-answering/data/IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content%20License%20Agreement.pdf
To run this example you also need to set OpenAI API key
import os
import json
from typing import Iterable, Literal, List
from pydantic import BaseModel, Field
# needed for the OpenAI embedder and the LLM we will use below, you can change the embedding provider, see the documentation:
# https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders
os.environ["OPENAI_API_KEY"] = "sk-"
api_key = os.environ["OPENAI_API_KEY"]
DATA_PATH = "./data"
os.makedirs(DATA_PATH, exist_ok=True)
To run the server use Pathway filesystem connector to read files from the data
folder.
import pathway as pw
from pathway.udfs import DiskCache
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from pathway.xpacks.llm.vector_store import VectorStoreServer
from pathway.xpacks.llm import llms, parsers
# read the text files under the data folder, we can also read from Google Drive, Sharepoint, etc.
# See connectors documentation: https://pathway.com/developers/user-guide/connect/pathway-connectors to learn more
data_sources = []
data_sources.append(
pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
) # This creates a `pathway` connector that tracks
# all the files in the ./data directory
)
And then pass them to the server, which will split them using CharacterTextSplitter
and embed them using OpenAIEmbeddings
, both from LangChain.
embeddings = OpenAIEmbeddings(api_key=api_key)
splitter = CharacterTextSplitter()
# define the document processing steps
parser = parsers.PypdfParser()
server = VectorStoreServer.from_langchain_components(
*data_sources, embedder=embeddings, splitter=splitter, parser=parser
)
# host and port of the RAG app
pathway_host: str = "0.0.0.0"
pathway_port: int = 8000
server.run_server(
host=pathway_host, port=pathway_port, with_cache=False, threaded=True
)
The server is now running and ready for querying with a VectorStoreServer
or with a PathwayVectorClient
from langchain-community
described in the next Section.
Using Pathway as a Vector Store in LangChain pipelines
Once you have a VectorStoreServer
running you can access it from LangChain pipeline by using PathwayVectorClient.
To do that you need to provide either the url
or host
and port
of the running VectorStoreServer
. In the code example below, you will connect to the VectorStoreServer
defined in the previous Section, so make sure it's running before making queries.
from langchain_community.vectorstores import PathwayVectorClient
client = PathwayVectorClient(host=pathway_host, port=pathway_port)
query = "What are the terms and conditions?"
docs = client.similarity_search(query)
print(docs)
As you can see, the LLM respond clearly to the question asked. Add new data to the folder Pathway is listening to, then ask our agent again to see how it responds to the newly indexed file.
RAG pipeline in LangChain
The next step is to write a chain in LangChain. The next example implements a simple RAG, that given a question, retrieves documents from Pathway Vector Store. These are then used as a context for the given question in a prompt sent to the OpenAI chat.
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
retriever = client.as_retriever()
template = """
You are smart assistant that helps users with their documents on Google Drive and Sharepoint.
Given a context, respond to the user question.
CONTEXT:
{context}
QUESTION: {question}
YOUR ANSWER:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI()
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
Now you have a RAG chain written in LangChain that uses Pathway as its Vector Store. Test it by asking some question.
chain.invoke("What are the two parties in the contract?")
Vector Store statistics
Just like VectorStoreClient
from the Pathway LLM xpack, PathwayVectorClient
gives you two methods for getting information about indexed documents.
The first one is get_vectorstore_statistics
and gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of the last updated one. The second one is get_input_files
, which gets the list of indexed files along with the associated metadata.