Implementation with LlamaIndex and LangChain
Below is an interesting hosted showcase built by combining the power of LlamaIndex and real-time data processing via Pathway which you can try on your own. On the left bar of the Streamlit interface, you can try to connect your Sharepoint or Google Drive folder and then see the tool in action. Interestingly this is a very popular use case for companies:
Let's see how you can build this.
Sample Tutorial/Implementation for LlamaIndex
Creating a real-time Retrieval-Augmented Generation (RAG) application using Pathway and Llamaindex involves several steps, from setting up your environment to running a fully integrated application. Here's a step-by-step tutorial to guide you through this process:
Prerequisites
- Ensure Docker, Dropbox, and Python are installed on your machine.
- Familiarity with Docker and Python programming is beneficial.
- Important Note: While the step-by-step below provides a non-Dockerized setup, using Docker is highly recommended as a best practice, ensuring consistency across different environments and simplifying the setup process. The last thing you want to tackle is "it doesn't work on my machine" problem when it's working for your peers. Secondly, if you're in an enterprise-setup, containerization is usually a de-facto.
- Installation
First, we need to install necessary packages. This includes LlamaIndex for retrieval functionalities and Pathway for data processing and indexing.
# Install LlamaIndex and Pathway packages using pip
pip install llama-index-embeddings-openai # For embeddings using OpenAI models
pip install llama-index-retrievers-pathway # For the Pathway retriever in LlamaIndex
pip install pathway # The Pathway package for data processing and indexing
pip install llama-index # Main LlamaIndex package
- Preparing Your Data
Create a directory to store your data and download a sample dataset. This is where Pathway will monitor for any changes to re-index the updated content.
# Create a directory for data and download sample data
mkdir -p data/ wget 'https://gist.githubusercontent.com/link_to_your_data' -O data/sample_data.md
Replace the wget URL with the actual link to your sample data.
- Configuring Your Environment
Set up your environment variables, including the OpenAI API key if you're using OpenAI models for embeddings. This key is required for accessing OpenAI's API services.
import os
import getpass
# Set up the OpenAI API key for embedding operations
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API Key: ")
- Logging Configuration
Configuring logging helps monitor the pipeline's execution and debug if necessary.
import logging
import sys
# Configure basic logging to stdout to monitor the process
logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
- Defining Data Sources
Specify which data sources Pathway should monitor. This can include local directories, cloud storage, etc. Pathway supports a variety of sources, making it versatile for different use cases.
import pathway as pw
# Define the data sources Pathway will monitor
data_sources = [
pw.io.fs.read("./data", format="binary", mode="streaming", with_metadata=True)
# Add more sources as needed
]
- Creating the Indexing Pipeline
This section defines the document processing pipeline. We split the text and then embed it using OpenAI models before indexing.
from llama_index.core.retrievers import PathwayVectorServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
# Setup for embedding model
embed_model = OpenAIEmbedding(embed_batch_size=10)
# Define transformations for the indexing pipeline
transformations_example = [
TokenTextSplitter(chunk_size=150, chunk_overlap=10, separator=" "),
embed_model,
]
# Initialize the processing pipeline with defined transformations
processing_pipeline = PathwayVectorServer(
data_sources,
transformations=transformations_example,
)
- Running the Server
Start the Pathway server to begin monitoring the data sources and indexing new or updated documents.
# Specify host and port for the Pathway server
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754
# Run the Pathway server in a non-blocking mode
processing_pipeline.run_server(host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True)
- Retrieval with LlamaIndex 🦙
Configure LlamaIndex to use the indexed data for retrieval. This involves setting up the PathwayRetriever
.
from llama_index.retrievers.pathway import PathwayRetriever
#Initialize the PathwayRetriever with the server's host and port
retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)
Now you can perform queries against the indexed data:
# Perform a retrieval query
response = retriever.retrieve("What is pathway?")
print(response)
This setup provides a foundation for building applications that require real-time data processing and retrieval. Remember, deploying this setup within a Docker container is recommended to avoid random dependency errors and to ensure consistency and ease of deployment.
Conclusion
This integration guide between Pathway and LlamaIndex serves as a comprehensive tutorial for you to get started. Below are a few additional links and examples which may be helpful.
- Pathway Retriever | LlamaIndex Documentation
- Pathway Reader | LlamaIndex Documentation
- Connecting various data sources such as a Google Drive | Pathway documentation
- Showcase: Pathway + LlamaIndex + Streamlit | GitHub
If you're a first time LLM/RAG App developer, you can consider going for a more minimalistic approach to showcase an impactful project.
The key thing is utility of your project and not much whether you're using Pathway's LLM App end-to-end or coupling it with LlamaIndex/Langchain, etc. to harness the power of realtime LLMs applications. 😄
Additional Links
Building RAG Application using LlamaIndex and Pathway | Tutorial on Streamlit/Snowflake.:
Building Reactive RAG apps with Langchain and Pathway: