Customizing AI Pipelines with YAML configuration files

Pathway offers a number of ready-to-use and easy to deploy AI Pipelines. To fit them to your needs without the need to alter Python code, you can configure them with YAML configuration files. Pathway uses a custom YAML parser to make configuring templates easier, and this guide explores capabilities of YAML parser.

Mapping tags

YAML format allows for assigning tags to key-value mappings, by prepending a chosen string with !. In Pathway configuration files, these tags are used to reference Python objects. In case of callables, they are called with arguments taken from the mapping. For example, this can be used to define a source Table using an input connector:

source: !pw.io.fs.read
  path: data
  format: binary
  with_metadata: true

Since classes are also callables, this syntax can also be used to initialize objects.

llm: !pw.xpacks.llm.llms.OpenAIChat
  model: "gpt-3.5-turbo"
  retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
    max_retries: 6
  cache_strategy: !pw.udfs.DiskCache
  temperature: 0.05
  capacity: 8

While these examples refer to components from Pathway package, you can use it for importing any object - in particular you can use your own functions and classes.

Referencing Enums

If the Python object referred to with ! is not a callable, the associated map needs to be empty and the object is used as is. In LLM templates this is used to set value of an argument that is an enum, e.g. BruteForceKnnFactory requires metric argument to be given a value from pw.stdlib.indexing.BruteForceKnnMetricKind enum:

retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory
  reserved_space: 1000
  embedder: $embedder
  metric: !pw.stdlib.indexing.BruteForceKnnMetricKind.COS
  dimensions: 1536

Defining Schemas

If you need to define a Schema in the YAML file, the easiest way to do it is to use one of the functions for inline definition of schema. For example, defining a Schema with field text of type str by using pw.schema_from_types looks like this:

schema: !pw.schema_from_types
  text: str

Variables

Identifiers starting with $ are given a new meaning - these denote variables to be used later in the configuration file.

$retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
  max_retries: 6

llm: !pw.xpacks.llm.llms.OpenAIChat
  model: "gpt-3.5-turbo"
  retry_strategy: $retry_strategy
  cache_strategy: !pw.udfs.DiskCache
  temperature: 0.05
  capacity: 8

embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder
  model: "text-embedding-ada-002"
  retry_strategy: $retry_strategy
  cache_strategy: !pw.udfs.DiskCache

Example: Demo-Question-Answering

To see YAMLs in practice let's look at the demo-question-answering pipeline. Note, that it differs from adaptive RAG, multimodal RAG and private RAG by the YAML configuration file - their Python code is the same.

Here is the content of app.yaml from demo-question-answering:

$sources:
  - !pw.io.fs.read
    path: data
    format: binary
    with_metadata: true

  # - !pw.xpacks.connectors.sharepoint.read 
  #   url: $SHAREPOINT_URL
  #   tenant: $SHAREPOINT_TENANT
  #   client_id: $SHAREPOINT_CLIENT_ID
  #   cert_path: sharepointcert.pem
  #   thumbprint: $SHAREPOINT_THUMBPRINT
  #   root_path: $SHAREPOINT_ROOT
  #   with_metadata: true
  #   refresh_interval: 30

  # - !pw.io.gdrive.read
  #   object_id: $DRIVE_ID
  #   service_user_credentials_file: gdrive_indexer.json
  #   name_pattern:
  #     - "*.pdf"
  #     - "*.pptx"
  #   object_size_limit: null
  #   with_metadata: true
  #   refresh_interval: 30

$llm: !pw.xpacks.llm.llms.OpenAIChat
  model: "gpt-3.5-turbo"
  retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
    max_retries: 6
  cache_strategy: !pw.udfs.DiskCache
  temperature: 0.05
  capacity: 8

$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder
  model: "text-embedding-ada-002"
  cache_strategy: !pw.udfs.DiskCache

$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
  max_tokens: 400

$parser: !pw.xpacks.llm.parsers.ParseUnstructured

$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory
  reserved_space: 1000
  embedder: $embedder
  metric: !pw.stdlib.indexing.BruteForceKnnMetricKind.COS
  dimensions: 1536
  

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources
  parser: $parser
  splitter: $splitter
  retriever_factory: $retriever_factory

question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer
  llm: $llm
  indexer: $document_store


# Change host and port by uncommenting these lines
# host: "0.0.0.0"
# port: 8000

# Cache configuration
# with_cache: true

# If `terminate_on_error` is true then the program will terminate whenever any error is encountered.
# Defaults to false, uncomment the following line if you want to set it to true
# terminate_on_error: true

This demo needs the question_answerer to be defined in the configuration file, and allows to override values of host, port, with_cache and terminate_on_error. The first thing you can try to do, is to add another input connector, e.g. that connects to files from Google Drive. The stub is already present in the file, so just uncomment it and fill object_id and service_user_credentials_file.

$sources:
  - !pw.io.fs.read
    path: data
    format: binary
    with_metadata: true

  - !pw.io.gdrive.read
    object_id: FILL_YOUR_DRIVE_ID
    service_user_credentials_file: FILL_PATH_TO_CREDENTIALS_FILE
    name_pattern:
      - "*.pdf"
      - "*.pptx"
    object_size_limit: null
    with_metadata: true
    refresh_interval: 30

$llm: !pw.xpacks.llm.llms.OpenAIChat
  model: "gpt-3.5-turbo"
  retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
    max_retries: 6
  cache_strategy: !pw.udfs.DiskCache
  temperature: 0.05
  capacity: 8

$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder
  model: "text-embedding-ada-002"
  cache_strategy: !pw.udfs.DiskCache

$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
  max_tokens: 400

$parser: !pw.xpacks.llm.parsers.ParseUnstructured

$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory
  reserved_space: 1000
  embedder: $embedder
  metric: !pw.stdlib.indexing.BruteForceKnnMetricKind.COS
  dimensions: 1536
  

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources
  parser: $parser
  splitter: $splitter
  retriever_factory: $retriever_factory

question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer
  llm: $llm
  indexer: $document_store

If you want to change the provider of LLM models, you can change values of llm and embedder. By changing llm to be LiteLLMChat, that uses local api_base, and embedder to be SentenceTransformerEmbedder you obtain a local RAG that does not call external services (this pipeline is now very similar to private RAG from the llm-app).

$sources:
  - !pw.io.fs.read
    path: data
    format: binary
    with_metadata: true

$llm_model: "ollama/mistral"

$llm: !pw.xpacks.llm.llms.LiteLLMChat
  model: $llm_model
  retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
    max_retries: 6
  cache_strategy: !pw.udfs.DiskCache
  temperature: 0
  top_p: 1
  format: "json"  # only available in Ollama local deploy, not usable in Mistral API
  api_base: "http://localhost:11434"

$embedding_model: "avsolatorio/GIST-small-Embedding-v0"

$embedder: !pw.xpacks.llm.embedders.SentenceTransformerEmbedder
  model: $embedding_model
  call_kwargs: 
    show_progress_bar: False

$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
  max_tokens: 400

$parser: !pw.xpacks.llm.parsers.ParseUnstructured

$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory
  reserved_space: 1000
  embedder: $embedder
  metric: !pw.engine.BruteForceKnnMetricKind.COS
  dimensions: 1536
  

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources
  parser: $parser
  splitter: $splitter
  retriever_factory: $retriever_factory

question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer
  llm: $llm
  indexer: $document_store

Alternatively, you may wish to improve indexing capabilities by using the HybridIndex. In this example you'll use Hybrid Index that combines vector based index - BruteForceKNN - and index based on text search - TantivyBM25.

$sources:
  - !pw.io.fs.read
    path: data
    format: binary
    with_metadata: true

$llm: !pw.xpacks.llm.llms.OpenAIChat
  model: "gpt-3.5-turbo"
  retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
    max_retries: 6
  cache_strategy: !pw.udfs.DiskCache
  temperature: 0.05
  capacity: 8

$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder
  model: "text-embedding-ada-002"
  cache_strategy: !pw.udfs.DiskCache

$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
  max_tokens: 400

$parser: !pw.xpacks.llm.parsers.ParseUnstructured

$knn_index: !pw.stdlib.indexing.BruteForceKnnFactory
  reserved_space: 1000
  embedder: $embedder
  metric: !pw.engine.BruteForceKnnMetricKind.COS
  dimensions: 1536

$bm25_index: !pw.stdlib.indexing.TantivyBM25Factory

$hybrid_index_factory: !pw.stdlib.indexing.HybridIndexFactory
  retriever_factories:
    - $knn_index
    - $bm25_index

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources
  parser: $parser
  splitter: $splitter
  retriever_factory: $hybrid_index_factory

question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer
  llm: $llm
  indexer: $document_store

These are just a few examples, but you can use any components from LLM xpack to have a pipeline that fully meets your need!