Handling JSON in Pathway
JSON is a widely used format for data interchange due to its simplicity and readability. Upon finishing this article, managing JSON in Pathway should become effortlessly intuitive.
As an example, we'll use JSON objects loaded directly from python list. However, JSON data can come from various sources that support this format, such as Kafka or an HTTP connector.
rows = [
(
1,
{
"author": {"id": 1, "name": "Haruki Murakami"},
"books": [
{"title": "Norwegian Wood", "year": 1987},
{
"title": "Kafka on the Shore",
"year": 2002,
"category": "Literary Fiction",
},
],
},
),
(
2,
{
"author": {"id": 2, "name": "Stanisław Lem"},
"books": [
{"title": "Solaris", "year": 1961, "category": "Science Fiction"},
{"title": "The Cyberiad", "year": 1967, "category": "Science Fiction"},
],
},
),
(
3,
{
"author": {"id": 3, "name": "William Shakespeare"},
"books": [
{"title": "Hamlet", "year": 1603, "category": "Tragedy"},
{"title": "Macbeth", "year": 1623, "category": "Tragedy"},
],
},
),
]
Each JSON object carries information about an author and their associated books. To load it, let's establish a schema reflecting the data's structure and then proceed to load this data into a table.
import pathway as pw
class InputSchema(pw.Schema):
key: int
data: pw.Json
table = pw.debug.table_from_rows(schema=InputSchema, rows=rows)
table
| key | data
^X1MXHYY... | 1 | {"author": {"id": 1, "name": "Haruki Murakami"}, "books": [{"title": "Norwegian Wood", "year": 1987}, {"title": "Kafka on the Shore", "year": 2002, "category": "Literary Fiction"}]}
^YYY4HAB... | 2 | {"author": {"id": 2, "name": "Stanis\u0142aw Lem"}, "books": [{"title": "Solaris", "year": 1961, "category": "Science Fiction"}, {"title": "The Cyberiad", "year": 1967, "category": "Science Fiction"}]}
^Z3QWT29... | 3 | {"author": {"id": 3, "name": "William Shakespeare"}, "books": [{"title": "Hamlet", "year": 1603, "category": "Tragedy"}, {"title": "Macbeth", "year": 1623, "category": "Tragedy"}]}
Pathway enables manipulation of JSON from two perspectives: expressions and user-defined functions. Let's examine each one separately.
Working with JSONs using expressions
Accessing JSON fields
A column of type pw.Json
enables access to its attributes using the index operator ([]
). This operator accepts an index in the form of a string for JSON objects, an integer for JSON arrays, or an expression evaluating to one of these types. If there's no element at the index or if the value is pw.Json.NULL
, it returns pw.Json.NULL
, making this operator convenient for chaining.
books = table.select(author=pw.this.data["author"]["name"], books=pw.this.data["books"])
books
| author | books
^X1MXHYY... | "Haruki Murakami" | [{"title": "Norwegian Wood", "year": 1987}, {"title": "Kafka on the Shore", "year": 2002, "category": "Literary Fiction"}]
^YYY4HAB... | "Stanis\u0142aw Lem" | [{"title": "Solaris", "year": 1961, "category": "Science Fiction"}, {"title": "The Cyberiad", "year": 1967, "category": "Science Fiction"}]
^Z3QWT29... | "William Shakespeare" | [{"title": "Hamlet", "year": 1603, "category": "Tragedy"}, {"title": "Macbeth", "year": 1623, "category": "Tragedy"}]
Alternatively, a get()
method can be used to access JSON
attributes. This method allows defining a custom default value.
sample = table.select(
author=pw.this.data["author"]["name"],
title=pw.this.data["books"][0]["title"],
category=pw.this.data["books"][0].get("category", default=pw.Json("Uncategorized")),
)
sample
| author | title | category
^X1MXHYY... | "Haruki Murakami" | "Norwegian Wood" | "Uncategorized"
^YYY4HAB... | "Stanis\u0142aw Lem" | "Solaris" | "Science Fiction"
^Z3QWT29... | "William Shakespeare" | "Hamlet" | "Tragedy"
Converting to simple types
JSON
column can be converted into Optional[T]
where T
is one of the simple types, using methods: as_int()
, as_str()
, as_float()
, as_bool()
.
books.select(author=pw.unwrap(pw.this.author.as_str()).str.upper())
| author
^X1MXHYY... | HARUKI MURAKAMI
^YYY4HAB... | STANISŁAW LEM
^Z3QWT29... | WILLIAM SHAKESPEARE
Flatten
You can utilize the flatten()
operator specifically on columns that contain JSON arrays. It's a useful tool when working with complex JSON structures.
flat_list = books.flatten(pw.this.books)
flat_list
| books | author
^Z3QVC46... | {"title": "Hamlet", "year": 1603, "category": "Tragedy"} | "William Shakespeare"
^X1MQZF8... | {"title": "Kafka on the Shore", "year": 2002, "category": "Literary Fiction"} | "Haruki Murakami"
^Z3QHRW2... | {"title": "Macbeth", "year": 1623, "category": "Tragedy"} | "William Shakespeare"
^X1MGYPB... | {"title": "Norwegian Wood", "year": 1987} | "Haruki Murakami"
^YYYA47A... | {"title": "Solaris", "year": 1961, "category": "Science Fiction"} | "Stanis\u0142aw Lem"
^YYY18MS... | {"title": "The Cyberiad", "year": 1967, "category": "Science Fiction"} | "Stanis\u0142aw Lem"
JSON in UDFs
Pathway enables manipulation of JSON using user-defined functions. Just like with expressions, the index operator ([]
) and methods allowing conversion into specific types are available. It's crucial to note that this conversion is strict— attempting to convert incompatible data will result in an exception.
@pw.udf
def transform(data: pw.Json) -> pw.Json:
return {"century": (data["year"].as_int()) // 100 + 1, **data.as_dict()}
flat_list.select(title=pw.this.books["title"], metadata=transform(pw.this.books))
| title | metadata
^Z3QVC46... | "Hamlet" | {"century": 17, "title": "Hamlet", "year": 1603, "category": "Tragedy"}
^X1MQZF8... | "Kafka on the Shore" | {"century": 21, "title": "Kafka on the Shore", "year": 2002, "category": "Literary Fiction"}
^Z3QHRW2... | "Macbeth" | {"century": 17, "title": "Macbeth", "year": 1623, "category": "Tragedy"}
^X1MGYPB... | "Norwegian Wood" | {"century": 20, "title": "Norwegian Wood", "year": 1987}
^YYYA47A... | "Solaris" | {"century": 20, "title": "Solaris", "year": 1961, "category": "Science Fiction"}
^YYY18MS... | "The Cyberiad" | {"century": 20, "title": "The Cyberiad", "year": 1967, "category": "Science Fiction"}
Further details about pw.Json
functionality are available in the dedicated API documentation.