Handling JSON in Pathway

JSON is a widely used format for data interchange due to its simplicity and readability. Upon finishing this article, managing JSON in Pathway should become effortlessly intuitive.

As an example, we'll use JSON objects loaded directly from python list. However, JSON data can come from various sources that support this format, such as Kafka or an HTTP connector.

rows = [
    (
        1,
        {
            "author": {"id": 1, "name": "Haruki Murakami"},
            "books": [
                {"title": "Norwegian Wood", "year": 1987},
                {
                    "title": "Kafka on the Shore",
                    "year": 2002,
                    "category": "Literary Fiction",
                },
            ],
        },
    ),
    (
        2,
        {
            "author": {"id": 2, "name": "Stanisław Lem"},
            "books": [
                {"title": "Solaris", "year": 1961, "category": "Science Fiction"},
                {"title": "The Cyberiad", "year": 1967, "category": "Science Fiction"},
            ],
        },
    ),
    (
        3,
        {
            "author": {"id": 3, "name": "William Shakespeare"},
            "books": [
                {"title": "Hamlet", "year": 1603, "category": "Tragedy"},
                {"title": "Macbeth", "year": 1623, "category": "Tragedy"},
            ],
        },
    ),
]

Each JSON object carries information about an author and their associated books. To load it, let's establish a schema reflecting the data's structure and then proceed to load this data into a table.

import pathway as pw

logging.basicConfig(level=logging.CRITICAL)
# _MD_COMMENT_END_


class InputSchema(pw.Schema):
    key: int
    data: pw.Json


table = pw.debug.table_from_rows(schema=InputSchema, rows=rows)

table
            | key | data
^X1MXHYY... | 1   | {"author": {"id": 1, "name": "Haruki Murakami"}, "books": [{"title": "Norwegian Wood", "year": 1987}, {"category": "Literary Fiction", "title": "Kafka on the Shore", "year": 2002}]}
^YYY4HAB... | 2   | {"author": {"id": 2, "name": "Stanis\u0142aw Lem"}, "books": [{"category": "Science Fiction", "title": "Solaris", "year": 1961}, {"category": "Science Fiction", "title": "The Cyberiad", "year": 1967}]}
^Z3QWT29... | 3   | {"author": {"id": 3, "name": "William Shakespeare"}, "books": [{"category": "Tragedy", "title": "Hamlet", "year": 1603}, {"category": "Tragedy", "title": "Macbeth", "year": 1623}]}

Pathway enables manipulation of JSON from two perspectives: expressions and user-defined functions. Let's examine each one separately.

Working with JSONs using expressions

Accessing JSON fields

A column of type pw.Json enables access to its attributes using the index operator ([]). This operator accepts an index in the form of a string for JSON objects, an integer for JSON arrays, or an expression evaluating to one of these types. If there's no element at the index or if the value is pw.Json.NULL, it returns pw.Json.NULL, making this operator convenient for chaining.

books = table.select(author=pw.this.data["author"]["name"], books=pw.this.data["books"])
books
            | author                | books
^X1MXHYY... | "Haruki Murakami"     | [{"title": "Norwegian Wood", "year": 1987}, {"category": "Literary Fiction", "title": "Kafka on the Shore", "year": 2002}]
^YYY4HAB... | "Stanis\u0142aw Lem"  | [{"category": "Science Fiction", "title": "Solaris", "year": 1961}, {"category": "Science Fiction", "title": "The Cyberiad", "year": 1967}]
^Z3QWT29... | "William Shakespeare" | [{"category": "Tragedy", "title": "Hamlet", "year": 1603}, {"category": "Tragedy", "title": "Macbeth", "year": 1623}]

Alternatively, a get() method can be used to access JSON attributes. This method allows defining a custom default value.

sample = table.select(
    author=pw.this.data["author"]["name"],
    title=pw.this.data["books"][0]["title"],
    category=pw.this.data["books"][0].get("category", default=pw.Json("Uncategorized")),
)
sample
            | author                | title            | category
^X1MXHYY... | "Haruki Murakami"     | "Norwegian Wood" | "Uncategorized"
^YYY4HAB... | "Stanis\u0142aw Lem"  | "Solaris"        | "Science Fiction"
^Z3QWT29... | "William Shakespeare" | "Hamlet"         | "Tragedy"

Converting to simple types

JSON column can be converted into Optional[T] where T is one of the simple types, using methods: as_int(), as_str(), as_float(), as_bool().

books.select(author=pw.unwrap(pw.this.author.as_str()).str.upper())
            | author
^X1MXHYY... | HARUKI MURAKAMI
^YYY4HAB... | STANISŁAW LEM
^Z3QWT29... | WILLIAM SHAKESPEARE

Flatten

You can utilize the flatten() operator specifically on columns that contain JSON arrays. It's a useful tool when working with complex JSON structures.

flat_list = books.flatten(pw.this.books, pw.this.author)
flat_list
            | books                                                                         | author
^0YJ7ZF8... | {"category": "Literary Fiction", "title": "Kafka on the Shore", "year": 2002} | "Haruki Murakami"
^3A8A47A... | {"category": "Science Fiction", "title": "Solaris", "year": 1961}             | "Stanis\u0142aw Lem"
^FPHH8MS... | {"category": "Science Fiction", "title": "The Cyberiad", "year": 1967}        | "Stanis\u0142aw Lem"
^D3DVC46... | {"category": "Tragedy", "title": "Hamlet", "year": 1603}                      | "William Shakespeare"
^D49HRW2... | {"category": "Tragedy", "title": "Macbeth", "year": 1623}                     | "William Shakespeare"
^2EQ0YPB... | {"title": "Norwegian Wood", "year": 1987}                                     | "Haruki Murakami"

JSON in UDFs

Pathway enables manipulation of JSON using user-defined functions. Just like with expressions, the index operator ([]) and methods allowing conversion into specific types are available. It's crucial to note that this conversion is strict— attempting to convert incompatible data will result in an exception.

@pw.udf
def transform(data: pw.Json) -> pw.Json:
    return {"century": (data["year"].as_int()) // 100 + 1, **data.as_dict()}


flat_list.select(title=pw.this.books["title"], metadata=transform(pw.this.books))
            | title                | metadata
^D3DVC46... | "Hamlet"             | {"category": "Tragedy", "century": 17, "title": "Hamlet", "year": 1603}
^0YJ7ZF8... | "Kafka on the Shore" | {"category": "Literary Fiction", "century": 21, "title": "Kafka on the Shore", "year": 2002}
^D49HRW2... | "Macbeth"            | {"category": "Tragedy", "century": 17, "title": "Macbeth", "year": 1623}
^2EQ0YPB... | "Norwegian Wood"     | {"century": 20, "title": "Norwegian Wood", "year": 1987}
^3A8A47A... | "Solaris"            | {"category": "Science Fiction", "century": 20, "title": "Solaris", "year": 1961}
^FPHH8MS... | "The Cyberiad"       | {"category": "Science Fiction", "century": 20, "title": "The Cyberiad", "year": 1967}

Further details about pw.Json functionality are available in the dedicated API documentation.

JSONtypeschema