Customizing Pathway Templates with Your Own Components
When using YAML to configure the Pathway Templates, you can use your own components with the mapping tags - either to implement the components not available in the Pathway library or to add additional processing steps. This article will go through details on importing custom components. This guide, however, does not cover basic syntax of the Pathway YAML configuration files - for that read the YAML syntax guide.
How are components imported?
To refer to any component after !
, you need to provide the full module name, e.g. pw.xpacks.llm.llms.OpenAIChat
. If no module is provided, only the name of the object, it is assumed to be from the builtins. For example you can use !str
to convert some object to string.
one_as_string: !str
object: 1
This means that any object you may want to use must be imported with the name of the module - this may be just the name of the Python file it is defined in. For example, if you want to use function foo
that is in file utils.py
in the same directory as app.py
and app.yaml
- the file tree looks as follows:
.
├── app.py
├── app.yaml
└── utils.py
you would refer to it as !utils.foo
.
Let's look now at a couple of examples on how to utilize that in your pipelines.
Altering metadata
Say you want to add some metadata that are not created by the input connector. For example, your use case is about running RAG on data about funds, and each file in the dataset has name in the format {ISIN number}_{Fund Name}_{Currency}.pdf
. While the filenames are part of the metadata, you may wish to take advantage of the specific format and include ISIN and Currency in the metadata to make the filtering easier - which cannot be done automatically by the connector.
First write a function that will add those fields to the metadata in utils.py
.
import pathway as pw
@pw.udf
def add_isin_and_currency(metadata: pw.Json) -> dict:
metadata_dict = metadata.as_dict()
path = metadata_dict["path"]
filename = path.split('/')[-1]
isin = filename.split('_')[0]
currency = filename.split('_')[-1][:-4]
metadata_dict["isin"] = isin
metadata_dict["currency"] = currency
return metadata_dict
def augment_metadata(sources: list[pw.Table]) -> list[pw.Table]:
return [t.with_columns(_metadata=add_isin_and_currency(pw.this._metadata)) for t in sources]
Then, in the app.yaml
from demo-question-answering apply this function on $sources
to obtain the changed tables, which are then given to the $document_store
.
$sources:
# File System connector, reading data locally.
- !pw.io.fs.read
path: data
format: binary
with_metadata: true
$sources_with_metadata: !utils.augment_metadata
sources: $sources
$document_store: !pw.xpacks.llm.document_store.DocumentStore
docs: $sources_with_metadata
parser: $parser
splitter: $splitter
retriever_factory: $retriever_factory
Parser for JSONs
For another example, consider needing a specific parser that is not a part of Pathway's LLM xpack. Consider streams coming from Kafka and each message is a dict with a key "content"
having a text to be indexed. Then you can replace a parser in the DocumentStore
with a version that extracts this string from each dict.
import pathway as pw
import json
class JsonParser(pw.UDF):
def __wrapped__(self, message: str) -> list[tuple[str, dict]]:
message_dict = json.loads(message)
return [(message_dict["content"], {})]
$sources:
- !pw.io.kafka.read
# Kafka configuration, check https://pathway.com/developers/api-docs/pathway-io/kafka for needed arguments
format: plaintext
$parser: !json_parser.JsonParser {}
$document_store: !pw.xpacks.llm.document_store.DocumentStore
docs: $sources
parser: $parser
splitter: $splitter
retriever_factory: $retriever_factory
Importing objects from external libraries
You can of course use also objects from the external libraries. If you do so, remember to use full name of the library rather than commonly use abbreciation, e.g. use pandas
module rather than pd
- the only abbreviation recognized by our YAML parser is that of pathway
as pw
. When you use components from the external libraries, you do not need to import them anywhere, pw.load_yaml
, used to parse YAML configuration files, takes care of that.
$dataframe: !pandas.DataFrame
"data": ["foo", "bar", baz"]
$sources:
- !pw.debug.table_from_pandas
df: $dataframe