Implementation with LlamaIndex

Creating a real-time Retrieval-Augmented Generation (RAG) application using Pathway and Llamaindex involves several steps, from setting up your environment to running a fully integrated application. Here's a step-by-step tutorial to guide you through this process

Installation

First, we need to install necessary packages. This includes LlamaIndex for retrieval functionalities and Pathway for data processing and indexing.

!pip install -qU "pathway[all]"
!pip install -qU llama-index
!pip install -qU llama-index-retrievers-pathway
!pip install -qU llama-index-embeddings-openai

Preparing Your Data

Create a directory to store your data and download a sample dataset. This is where Pathway will monitor for any changes to re-index the updated content.

!mkdir -p 'data/'
!wget -q -P ./data/ https://github.com/pathwaycom/llm-app/raw/main/examples/pipelines/demo-question-answering/data/IdeanomicsInc_20160330_10-K_EX-10.26_9512211_EX-10.26_Content%20License%20Agreement.pdf

Configuring Your Environment

Set up your environment variables, including the OpenAI API key if you're using OpenAI models for embeddings. This key is required for accessing OpenAI's API services.

import os
import json
from typing import Iterable, Literal, List
from pydantic import BaseModel, Field
# needed for the OpenAI embedder and the LLM we will use below, you can change the embedding provider, see the documentation:
# https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders
os.environ["OPENAI_API_KEY"] = "sk-"

Logging Configuration

Configuring logging helps monitor the pipeline's execution and debug if necessary.

import logging
import sys

# Configure basic logging to stdout to monitor the process
logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))

Defining Data Sources

Specify which data sources Pathway should monitor. This can include local directories, cloud storage, etc. Pathway supports a variety of sources, making it versatile for different use cases.

import pathway as pw

data_sources = []
data_sources.append(
    pw.io.fs.read(
        "./data",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )  # This creates a `pathway` connector that tracks
    # all the files in the ./data directory
)

Creating the Indexing Pipeline

This section defines the document processing pipeline. We split the text and then embed it using OpenAI models before indexing.

from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
from pathway.xpacks.llm.vector_store import VectorStoreServer
from pathway.xpacks.llm import llms, parsers
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer, RAGClient

# Setup for embedding model
embed_model = OpenAIEmbedding(embed_batch_size=10)

parser = parsers.PypdfParser()

# Define transformations for the indexing pipeline
transformations_example = [
    TokenTextSplitter(
        chunk_size=150,
        chunk_overlap=10,
        separator=" ",
    ),
    embed_model,
]

# Initialize the processing pipeline with defined transformations
processing_pipeline = VectorStoreServer.from_llamaindex_components(
    *data_sources,
    transformations=transformations_example,
    parser=parser
)

Running the Server

Start the Pathway server to begin monitoring the data sources and indexing new or updated documents.

# Specify host and port for the Pathway server
pathway_host: str = "0.0.0.0"
pathway_port: int = 8000
# Run the Pathway server
processing_pipeline.run_server(
    host=pathway_host, port=pathway_port, with_cache=False, threaded=True
)

Retrieval with LlamaIndex 🦙

Configure LlamaIndex to use the indexed data for retrieval. This involves setting up the PathwayRetriever.

from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.retrievers.pathway import PathwayRetriever

retriever = PathwayRetriever(host=pathway_host, port=pathway_port)

query_engine = RetrieverQueryEngine.from_args(
    retriever
)

Now you can perform queries against the indexed data:

response = query_engine.query("What are terms and conditions?")
print(str(response))

This setup provides a foundation for building applications that require real-time data processing and retrieval. Remember, deploying this setup within a Docker container is recommended to avoid random dependency errors and to ensure consistency and ease of deployment.

Conclusion

This integration guide between Pathway and LlamaIndex serves as a comprehensive tutorial for you to get started. Below are a few additional links and examples which may be helpful.

If you're a first time LLM/RAG App developer, you can consider going for a more minimalistic approach to showcase an impactful project.

The key thing is utility of your project and not much whether you're using Pathway's LLM App end-to-end or coupling it with LlamaIndex/Langchain, etc. to harness the power of realtime LLMs applications. 😄