JSON Lines connectors

Pathway provides connectors to read and write data streamings using JSON Lines files.

JSON Lines, also called newline-delimited JSON, is a format for structured data, following three requirements:

  1. UTF-8 Encoding.
  2. Each Line is a valid JSON Value.
  3. The line separator is \n.

The suggested extension for JSON Lines files is .jsonl.

This is a correct JSON Lines entry:

{"key": 1, "recipient": "Bill H.", "sender": "Nancy R."}
{"key": 2, "recipient": "Harry P.", "sender": "Hermione G."}
{"key": 3, "recipient": "Julian S.", "sender": "Dick F."}

⚠️ JSON lines connectors work both in streaming and static modes. Be careful as the use of connectors differs depending on the chosen mode: see the differences.

Input connector

To read a file or a directory, use the pw.io.jsonlines.read function. It takes several parameters, including:

  • path: the path of the directory or the file to read.
  • schema (optional): the schema of the resulting table.
  • mode (optional): the mode in which the connector is used, streaming or static. Defaults to streaming.

The connector can be used to read a directory:

table = pw.io.jsonlines.read("./input_directory/", schema=InputSchema)

In this case, all the files within this directory are ingested into the table.

You can choose to read a single file:

table = pw.io.jsonlines.read("./input_file.jsonl", schema=InputSchema)

Any file modification would be reflected in the table read: if you delete a part of a file, the respective data will be deleted from the table.

Let's consider the following example, which reads a JSON Lines file and outputs it in a CSV file using the CSV output connector:

import pathway as pw


class InputSchema(pw.Schema):
    key: int = pw.column_definition(primary_key=True)
    recipient: str
    sender: str


table = pw.io.jsonlines.read("./input_file.jsonl", schema=InputSchema)
pw.io.csv.write(table, "./output.csv")
pw.run()

With the input_file file containing the correct example defined in the introduction, the output is:

key,recipient,sender,time,diff
1,"Bill H.","Nancy R.",1707985402732,1
2,"Harry P.","Hermione G.",1707985402732,1
3,"Julian S.","Dick F.",1707985402732,1

Adding this line {"key": 4, "recipient": "Juliet", "sender": "Romeo"} will update the output:

key,recipient,sender,time,diff
1,"Bill H.","Nancy R.",1707985402732,1
2,"Harry P.","Hermione G.",1707985402732,1
3,"Julian S.","Dick F.",1707985402732,1
4,"Juliet","Romeo",1707985410232,1

The removal of the line is also passed to the output:

key,recipient,sender,time,diff
1,"Bill H.","Nancy R.",1707985402732,1
2,"Harry P.","Hermione G.",1707985402732,1
3,"Julian S.","Dick F.",1707985402732,1
4,"Juliet","Romeo",1707985410232,1
4,"Juliet","Romeo",1707985423732,-1

The diff value is set to -1, representing a removal.

Static case

The JSON Lines connector also supports the static mode: it will read all the data at once and then closes the connection. To activate it, you must set the mode parameter to "static":

table = pw.io.jsonlines.read("./input_file.jsonl", schema=InputSchema, mode="static")
pw.debug.compute_and_print(table)
[2024-05-21T05:05:30]:INFO:Preparing Pathway computation


[2024-05-21T05:05:30]:INFO:FilesystemReader-0: 0 entries (1 minibatch(es)) have been sent to the engine


[2024-05-21T05:05:30]:INFO:FilesystemReader-0: 3 entries (2 minibatch(es)) have been sent to the engine


[2024-05-21T05:05:30]:WARNING:FilesystemReader-0: Closing the data source


            | key | recipient | sender
^YYY4HAB... | 1   | Bill H.   | Nancy R.
^Z3QWT29... | 2   | Harry P.  | Hermione G.
^3CZ78B4... | 3   | Julian S. | Dick F.

Output connector

To output a table in the JSON Lines format, you should use pw.io.jsonlines.write. It takes two parameters:

  • the table to output
  • the filename.
pw.io.jsonlines.write(table, "output_file.jsonl")

⚠️ The JSON Lines output connector only works in streaming mode.

Complete example

import pathway as pw


class InputSchema(pw.Schema):
    key: int = pw.column_definition(primary_key=True)
    recipient: str
    sender: str


table = pw.io.jsonlines.read("./input_file.jsonl", schema=InputSchema)
pw.io.jsonlines.write(table, "./output_file.jsonl")
pw.run()

By doing the same operations as before (adding the line and removing it), you obtain the following results:

{"key":1,"recipient":"Bill H.","sender":"Nancy R.","diff":1,"time":1707987230734}
{"key":2,"recipient":"Harry P.","sender":"Hermione G.","diff":1,"time":1707987230734}
{"key":3,"recipient":"Julian S.","sender":"Dick F.","diff":1,"time":1707987230734}
{"key":4,"recipient":"Juliet","sender":"Romeo","diff":1,"time":1707987260732}
{"key":4,"recipient":"Juliet","sender":"Romeo","diff":-1,"time":1707987269732}

This example simply copies the streams of updates. To do something more complicated, you can check our tutorial about the basic table operations Pathway supports.

connectorJSON LinesJSON