Langchain and Pathway: RAG Apps with always-up-to-date knowledge
You can now use Pathway in your RAG applications which enables always up-to-date knowledge from your documents to LLMs with Langchaing integration.
Pathway is now available on Langchain, a framework for developing applications powered by large language models (LLMs). You can now query Pathway and access up-to-date documents for your RAG applications from LangChain using PathwayVectorClient.
With this new integration, you will be able to use Pathway Vector Store natively in LangChain. In this guide, you will have a quick dive into Pathway + LangChain to learn how to create a simple, yet powerful RAG solution.
Prerequisites
To work with LangChain you need to install langchain
package, as it is not a dependence of Pathway. In the example in this guide you will also use OpenAIEmbeddings
class for which you need langchain_openai
package.
!pip install langchain
!pip install langchain_community
!pip install langchain_openai
Using LangChain components in Pathway Vector Store
When using Pathway VectorStoreServer
, you can use LangChain embedder and splitter for processing documents. To do that, use from_langchain_components
class method.
To start, you need to create a folder Pathway will listen to. Feel free to skip this if you already have a folder on which you want to build your RAG application. You can also use Google Drive, Sharepoint, or any other source from pathway-io.
!mkdir -p 'data/'
To run this example you also need to set OpenAI API key, or change the embedder.
import os
import getpass
if "OPENAI_API_KEY" in os.environ:
api_key = os.environ["OPENAI_API_KEY"]
else:
api_key = getpass.getpass("OpenAI API Key:")
To run the server use Pathway filesystem connector to read files from the data
folder.
import pathway as pw
from pathway.xpacks.llm.vector_store import VectorStoreServer
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
data = pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
)
And then pass them to the server, which will split them using CharacterTextSplitter
and embed them using OpenAIEmbeddings
, both from LangChain.
embeddings = OpenAIEmbeddings(api_key=api_key)
splitter = CharacterTextSplitter()
host = "127.0.0.1"
port = 8666
server = VectorStoreServer.from_langchain_components(
data, embedder=embeddings, splitter=splitter
)
server.run_server(host, port=port, with_cache=True, cache_backend=pw.persistence.Backend.filesystem("./Cache"), threaded=True)
The server is now running and ready for querying with a VectorStoreServer
ot with a PathwayVectorClient
from langchain-community
described in the next Section.
Using Pathway as a Vector Store in LangChain pipelines
Once you have a VectorStoreServer
running you can access it from LangChain pipeline by using PathwayVectorClient.
To do that you need to provide either the url
or host
and port
of the running VectorStoreServer
. In the code example below, you will connect to the VectorStoreServer
defined in the previous Section, so make sure it's running before making queries. Alternatively, you can also use a publicly available demo pipeline to test your client. Its REST API you can access at https://demo-document-indexing.pathway.stream
. This demo ingests documents from Google Drive and Sharepoint.
from langchain_community.vectorstores import PathwayVectorClient
client = PathwayVectorClient(host=host, port=port)
query = "What is Pathway?"
docs = client.similarity_search(query)
print(docs)
As you can see, the LLM cannot respond clearly as it lacks current knowledge, but this is where Pathway shines. Add new data to the folder Pathway is listening to, then ask our agent again to see how it responds. To do that, you can download the repo readme of Pathway into our data folder:
!wget 'https://raw.githubusercontent.com/pathwaycom/pathway/main/README.md' -O 'data/pathway_readme.md' -q -nc
Try again to query with the new data:
docs = client.similarity_search(query)
print(docs)
RAG pipeline in LangChain
The next step is to write a chain in LangChain. The next example implements a simple RAG, that given a question, retrieves documents from Pathway Vector Store. These are then used as a context for the given question in a prompt sent to the OpenAI chat.
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
retriever = client.as_retriever()
template = """
You are smart assistant that helps users with their documents on Google Drive and Sharepoint.
Given a context, respond to the user question.
CONTEXT:
{context}
QUESTION: {question}
YOUR ANSWER:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI()
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
Now you have a RAG chain written in LangChain that uses Pathway as its Vector Store. Test it by asking some question.
chain.invoke("What is Pathway?")
Vector Store statistics
Just like VectorStoreClient
from the Pathway LLM xpack, PathwayVectorClient
gives you two methods for getting information about indexed documents.
The first one is get_vectorstore_statistics
and gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of the last updated one. The second one is get_input_files
, which gets the list of indexed files along with the associated metadata.
print(client.get_vectorstore_statistics())
print(client.get_input_files())