Customizing Pathway Templates with Your Own Components

When using YAML to configure the Pathway Templates, you can use your own components with the mapping tags - either to implement the components not available in the Pathway library or to add additional processing steps. This article will go through details on importing custom components. This guide, however, does not cover basic syntax of the Pathway YAML configuration files - for that read the YAML syntax guide.

How are components imported?

To refer to any component after !, you need to provide the full module name, e.g. pw.xpacks.llm.llms.OpenAIChat. If no module is provided, only the name of the object, it is assumed to be from the builtins. For example you can use !str to convert some object to string.

one_as_string: !str
  object: 1

This means that any object you may want to use must be imported with the name of the module - this may be just the name of the Python file it is defined in. For example, if you want to use function foo that is in file utils.py in the same directory as app.py and app.yaml - the file tree looks as follows:

.
├── app.py
├── app.yaml
└── utils.py

you would refer to it as !utils.foo.

Let's look now at a couple of examples on how to utilize that in your pipelines.

Altering metadata

Say you want to add some metadata that are not created by the input connector. For example, your use case is about running RAG on data about funds, and each file in the dataset has name in the format {ISIN number}_{Fund Name}_{Currency}.pdf. While the filenames are part of the metadata, you may wish to take advantage of the specific format and include ISIN and Currency in the metadata to make the filtering easier - which cannot be done automatically by the connector.

First write a function that will add those fields to the metadata in utils.py.

utils.py
import pathway as pw

@pw.udf
def add_isin_and_currency(metadata: pw.Json) -> dict:
    metadata_dict = metadata.as_dict()
    path = metadata_dict["path"]
    filename = path.split('/')[-1]
    isin = filename.split('_')[0]
    currency = filename.split('_')[-1][:-4]
    metadata_dict["isin"] = isin
    metadata_dict["currency"] = currency
    return metadata_dict


def augment_metadata(sources: list[pw.Table]) -> list[pw.Table]:
    return [t.with_columns(_metadata=add_isin_and_currency(pw.this._metadata)) for t in sources]

Then, in the app.yaml from demo-question-answering apply this function on $sources to obtain the changed tables, which are then given to the $document_store.

app.yaml
$sources:
  # File System connector, reading data locally.
  - !pw.io.fs.read
    path: data
    format: binary
    with_metadata: true

$sources_with_metadata: !utils.augment_metadata
  sources: $sources

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources_with_metadata
  parser: $parser
  splitter: $splitter
  retriever_factory: $retriever_factory

Parser for JSONs

For another example, consider needing a specific parser that is not a part of Pathway's LLM xpack. Consider streams coming from Kafka and each message is a dict with a key "content" having a text to be indexed. Then you can replace a parser in the DocumentStore with a version that extracts this string from each dict.

json_parser.py
import pathway as pw
import json

class JsonParser(pw.UDF):
    def __wrapped__(self, message: str) -> list[tuple[str, dict]]:
        message_dict = json.loads(message)
        return [(message_dict["content"], {})]
app.yaml
$sources:
  - !pw.io.kafka.read
    # Kafka configuration, check https://pathway.com/developers/api-docs/pathway-io/kafka for needed arguments
    format: plaintext

$parser: !json_parser.JsonParser {}

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources
  parser: $parser
  splitter: $splitter
  retriever_factory: $retriever_factory

Importing objects from external libraries

You can of course use also objects from the external libraries. If you do so, remember to use full name of the library rather than commonly use abbreciation, e.g. use pandas module rather than pd- the only abbreviation recognized by our YAML parser is that of pathway as pw. When you use components from the external libraries, you do not need to import them anywhere, pw.load_yaml, used to parse YAML configuration files, takes care of that.

app.yaml
$dataframe: !pandas.DataFrame
  "data": ["foo", "bar", baz"]

$sources: 
  - !pw.debug.table_from_pandas
    df: $dataframe