How it Works

This pipeline can use several Pathway connectors to read the data from the local drive, Google Drive, and Microsoft SharePoint sources. It allows you to poll the changes with low latency and to do the modifications tracking. So, if something changes in the tracked files, the corresponding change is reflected in the internal collections. The contents are read into a single Pathway Table as binary objects.

After that, those binary objects are parsed with “unstructured” library and split into chunks. With the usage of OpenAI API, the pipeline embeds the obtained chunks.

Finally, the embeddings are indexed with the capabilities of Pathway's machine-learning library. The user can then query the created index with simple HTTP requests to the endpoints mentioned above.

Understanding your RAG pipeline

This folder contains several objects:

  • app.py, the application code using Pathway and written in Python;
  • config.yaml, the file containing configuration stubs for the data sources, the OpenAI LLM model, and the web server. It needs to be customized if you want to change the LLM model, use the Google Drive data source or change the filesystem directories that will be indexed;
  • requirements.txt, the dependencies for your pipeline. It can be passed to pip install -r ... to install everything that is needed to launch the pipeline locally;
  • Dockerfile, the Docker configuration for running the pipeline in the container;
  • .env, a short environment variables configuration file where the OpenAI key must be stored;
  • data/, a folder with exemplary files that can be used for the test runs.

Let's understand your application code in app.py

Here in your app.py file you've followed a sequence of steps. Before looking at the code, let's give it a glance.

  1. Set Up Your License Key: You ensure you have the necessary access to Pathway features.
  2. Configure Logging: Set up logging to monitor what’s happening in your application.
  3. Load Environment Variables: Manage sensitive data securely.
  4. Define Data Sources Function: Handle data from various sources seamlessly.
  5. Main Function with Click: Use command-line interaction to control your pipeline.
  6. Initialize Embedder: Convert text to embeddings for further processing.
  7. Initialize Chat Model: Set up your language model for generating responses.
  8. Set Up Vector Store: Manage and retrieve document embeddings efficiently.
  9. Set Up RAG Application: Combine retrieval and generation for effective question answering.
  10. Build and Run Server: Start your server to handle real-time requests.
app.py
import logging
import sys
import click
import pathway as pw
import yaml
from dotenv import load_dotenv
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm import embedders, llms, parsers, splitters
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer
from pathway.xpacks.llm.vector_store import VectorStoreServer

# Set your Pathway license key here to use advanced features.
# You can obtain a free license key from: https://pathway.com/get-license
# If you're using the Community version, you can comment this out.
pw.set_license_key("demo-license-key-with-telemetry")

# Set up basic logging to capture key events and errors.
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

# Load environment variables (e.g., API keys) from the .env file.
load_dotenv()

# Function to handle data sources. The example uses a local folder first,
# but you can easily connect to Google Drive or SharePoint as alternative data sources.
def data_sources(source_configs) -> list[pw.Table]:
    sources = []
    for source_config in source_configs:
        if source_config["kind"] == "local":
            # Reading data from a local directory (default is the 'data' folder).
            # This is the first option used in this example, but it's flexible for other sources.
            source = pw.io.fs.read(
                **source_config["config"],
                format="binary",
                with_metadata=True,
            )
            sources.append(source)

        elif source_config["kind"] == "gdrive":
            # Reading data from a Google Drive folder.
            # Requires OAuth credentials specified in the config.yaml file.
            source = pw.io.gdrive.read(
                **source_config["config"],
                with_metadata=True,
            )
            sources.append(source)

        elif source_config["kind"] == "sharepoint":
            try:
                # Import the SharePoint connector for reading data from SharePoint.
                import pathway.xpacks.connectors.sharepoint as io_sp
                
                # Reading data from a SharePoint folder.
                # Note: The SharePoint connector is part of Pathway's commercial offering.
                source = io_sp.read(**source_config["config"], with_metadata=True)
                sources.append(source)
            except ImportError:
                # If SharePoint is configured but the connector isn't available, exit the program.
                print(
                    "The Pathway Sharepoint connector is part of the commercial offering. "
                    "Please contact us for a commercial license."
                )
                sys.exit(1)

    return sources

# Command-line interface (CLI) function to run the app with a specified config file.
@click.command()
@click.option("--config_file", default="config.yaml", help="Config file to be used.")
def run(config_file: str = "config.yaml"):
    # Load the configuration from the YAML file.
    with open(config_file) as config_f:
        configuration = yaml.safe_load(config_f)

    # Fetch the GPT model from the YAML configuration.
    GPT_MODEL = configuration["llm_config"]["model"]

    # Initialize the OpenAI Embedder to handle embeddings with caching enabled.
    embedder = embedders.OpenAIEmbedder(
        model="text-embedding-ada-002",
        cache_strategy=DiskCache(),
    )

    # Set up OpenAI's GPT model for answering questions with retry and caching strategies.
    chat = llms.OpenAIChat(
        model=GPT_MODEL,
        retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
        cache_strategy=DiskCache(),
        temperature=0.05,  # Low temperature for less random responses.
    )

    # Host and port configuration for running the server.
    host_config = configuration["host_config"]
    host, port = host_config["host"], host_config["port"]

    # Initialize the vector store for storing document embeddings in memory.
    # This vector store updates the index dynamically whenever the data source changes
    # and can scale to handle over a million documents.
    doc_store = VectorStoreServer(
        *data_sources(configuration["sources"]),
        embedder=embedder,
        splitter=splitters.TokenCountSplitter(max_tokens=400),  # Split documents by token count.
        parser=parsers.ParseUnstructured(),  # Parse unstructured data for better handling.
    )

    # Create a RAG (Retrieve and Generate) question-answering application.
    rag_app = BaseRAGQuestionAnswerer(llm=chat, indexer=doc_store)

    # Build the server to handle requests at the specified host and port.
    rag_app.build_server(host=host, port=port)

    # Run the server with caching enabled, and handle errors without shutting down.
    rag_app.run_server(with_cache=True, terminate_on_error=False)

# Entry point to execute the app if the script is run directly.
if __name__ == "__main__":
    run()

Possible Modifications

  • Change Input Folders: Update paths to new data folders.
  • Modify LLM: Switch to a different language model
  • Change Embedder: Use an alternative embedder from embedders.
  • Update Index: Configure a different indexing method.
  • Host and Port: Adjust the host and port settings for different environments.
  • Run Options: Enable or disable caching and specify a new cache folder.

It is also possible to easily create new components by extending the pw.UDF class and implementing the __wrapped__ function.

Conclusion

This demonstrates setting up a powerful RAG pipeline with always up-to-date knowledge. While we've only scratched the surface, there's more to explore:

  • Re-ranking: Prioritize the most relevant results for your specific query.
  • Knowledge Graphs: Leverage relationships between entities to improve understanding.
  • Hybrid Indexing: Combine different indexing strategies for optimal retrieval.
  • Adaptive Reranking: Iteratively enlarge the context for optimal accuracy, see our next tutorial around adaptive RAG.

Stay tuned for future examples exploring these RAG techniques with Pathway!

Enjoy building your RAG project! If you have any questions or need further assistance, feel free to reach out to the Pathway team or check with your peers from the bootcamp cohort.

What if you want to use a Multimodal LLM like GPT-4o

That's a great idea indeed. Multimodal LLMs like GPT-4o excel at parsing images, charts, etc. thereby significantly enhancing the accuracy for text-based use-cases as well.

For example, imagine if you're building a RAG project with Google Drive as a data source. But that Drive folder has several financial documents with charts, tables, etc. Below is an interesting example where you'll see how Pathway parsed tables as images and used GPT-4o to get a much more accurate response.

gpt_4o_multimodal_rag exampleGitHub