File System Connectors

This guide explains the fs connectors that connect Pathway to your file system to read and write data with the following basic formats: binary, plaintext, CSV, and JSON.

The first part of this guide focuses on defining the source of the data for our connector (using plaintext data format to keep things simple). The second part explains additional configuration that can (or needs to) be defined for all simple data formats. In particular we show the input connectors (pw.io.fs.read) reading data in:

The output connectors (pw.io.fs.write) write data in:

File system connectors work both in streaming and static modes. Be careful as the use of connectors differs depending on the chosen mode: see the differences. For simplicity, all the examples below are in the "static" mode but can easily be changed to "streaming" mode by changing the mode parameter.

Location of files and filter.

The code snippets below prepares the basic file structure that is used in the later part of this article. To keep tings simple, all examples work with data of type str, to see more on schemas and types in other places.

! mkdir -p plain_input
! mkdir -p plain_output
! echo -e "test1\ndata1" > plain_input/in1.txt
! echo -e "test2\ndata2" > plain_input/in2.txt

Specify the input and output with path and filename.

Below, you can find the simplest examples of input (pw.io.fs.read) and output (pw.io.fs.write) connectors. Both examples use plaintext as the input format (more on that later). The path parameter can point either to a directory or a particular file. If it point so a directory, it reads all files that are inside. Otherwise it reads only the file that is specified (and as such it makes sense only in the static mode).

%%capture
import pathway as pw
test1 = pw.io.fs.read(path = "./plain_input/", format = "plaintext", mode="static")
pw.io.fs.write(test1, filename="./plain_output/out1.txt", format="json")

test2 = pw.io.fs.read(path = "./plain_input/in1.txt", format = "plaintext", mode="static")
pw.io.fs.write(test2, filename="./plain_output/out2.txt", format="json")

pw.run()

The output can be found in the plain_output directory.

! echo "out1:"
! cat plain_output/out1.txt
! echo "out2:"
! cat plain_output/out2.txt
out1:


{"data":"test1","diff":1,"time":1713548857868}
{"data":"data2","diff":1,"time":1713548857868}
{"data":"data1","diff":1,"time":1713548857868}
{"data":"test2","diff":1,"time":1713548857868}


out2:


{"data":"test1","diff":1,"time":1713548857868}
{"data":"data1","diff":1,"time":1713548857868}

As you can see, the first example read the data from both in1.txt and in2.txt, while the second read only the data from in1.txt.

Filter the files to read with object_pattern

In case you want to specify a directory as the source of your data, but read only some of its contents, you can specify filter pattern and pass it using the object_pattern parameter.

%%capture
test3 = pw.io.fs.read("./plain_input/", format = "plaintext", mode="static", object_pattern = "*2*")
pw.io.fs.write(test3, "./plain_output/output3.txt", "json")
pw.run()

The output can be found in the plain_output directory. As you can see, out3.txt contains data only from in2.txt, as it is the only file in the input directory that matches the *2* pattern:

! echo "out3:"
! cat plain_output/output3.txt
out3:


{"data":"data2","diff":1,"time":1713548858486}
{"data":"test2","diff":1,"time":1713548858486}

Data formats

CSV

For the CSV format, each file on the input needs to have defined headers.

! mkdir -p csv_input
! mkdir -p csv_output
! echo -e "header1;header2\ndata1;data2\n\ndata3;data4" > csv_input/csv_in1.txt
! echo -e "header1;header2\ndata5;data6\n\ndata7;data8" > csv_input/csv_in2.txt
! echo -e "csv_in1.txt:"
! cat csv_input/csv_in1.txt
! echo -e "csv_in2.txt:"
! cat csv_input/csv_in2.txt
csv_in1.txt:


header1;header2
data1;data2

data3;data4


csv_in2.txt:


header1;header2
data5;data6

data7;data8

In most cases, in order to read the data, you need to define its schema and pass it to the connector. Furthermore, for the csv format, you can use CSVParserSettings to accommodate for a nonstandard formatting of the input file. In the example below, it is configured to use ; as a delimiter.

%%capture
class csv_schema(pw.Schema):
    header1: str
    header2: str

csv_settings = pw.io.CsvParserSettings(delimiter=";")
csv_data = pw.io.fs.read(path = "./csv_input/", format="csv", schema=csv_schema, csv_settings=csv_settings,mode="static")
pw.io.fs.write(table=csv_data, filename="./csv_output/csv_out1.txt", format="csv")
pw.run()
! cat ./csv_output/csv_out1.txt
header1,header2,time,diff
"data7","data8",1713548860000,1
"data1","data2",1713548860000,1
"data3","data4",1713548860000,1
"data5","data6",1713548860000,1

You can also use the dedicated CSV connector.

JSON

You can use the JSON format by setting the parameter format to json.

! mkdir -p json_input
! mkdir -p json_output
! echo -e '{"header1":data1",\n"header2":"data2"}\n{"header1":"data3","header2":"data4"}\n' > json_input/json_in1.txt
! echo -e '{"header1":"data5","header2":"data6"}\n{"header1":"data7","header2":"data8"}\n' > json_input/json_in2.txt
! echo -e "json_in1.txt:"
! cat json_input/json_in1.txt
! echo -e "json_in2.txt:"
! cat json_input/json_in2.txt
json_in1.txt:


{"header1":data1",
"header2":"data2"}
{"header1":"data3","header2":"data4"}



json_in2.txt:


{"header1":"data5","header2":"data6"}
{"header1":"data7","header2":"data8"}

As in most cases, in order to read the data, you need to define a schema and pass it to the connector. Each input file needs to be a sequence of properly formatted JSON objects.

%%capture
class json_schema(pw.Schema):
    header1: str
    header2: str

json_data = pw.io.fs.read(path = "./json_input/", format="json", schema=json_schema, mode="static")
pw.io.fs.write(table=json_data, filename="./json_output/json_out1.txt", format="json")
pw.run()
! cat ./json_output/json_out1.txt
{"header1":"data7","header2":"data8","diff":1,"time":1713548861362}
{"header1":"data3","header2":"data4","diff":1,"time":1713548861362}
{"header1":"data5","header2":"data6","diff":1,"time":1713548861362}

Unstructured data

Pathway allows you to read unstructured data using three formats: plaintext, plaintext_by_file, and binary. binary and plaintext considers each line as a separate row that will be stored in the column data, and the format plaintext_by_file treats each file as a single row.

! mkdir -p unstructured_output
%%capture
plaintext_data = pw.io.fs.read(path = "./plain_input", format = "plaintext", mode="static")
pw.io.fs.write(plaintext_data,"./unstructured_output/output1.txt", "csv")

plaintext_by_file_data = pw.io.fs.read(path = "./plain_input", format = "plaintext_by_file", mode="static")
pw.io.fs.write(plaintext_by_file_data,"./unstructured_output/output2.txt", "csv")

binary_data = pw.io.fs.read(path = "./plain_input", format = "binary", mode="static")
pw.io.fs.write(binary_data,"./unstructured_output/output3.txt", "csv")

pw.run()
! echo "plaintext"
! cat ./unstructured_output/output1.txt
! echo "plaintext by file"
! cat ./unstructured_output/output2.txt
! echo "binary"
! cat ./unstructured_output/output3.txt
plaintext


data,time,diff
"test1",1713548861704,1
"data2",1713548861704,1
"data1",1713548861704,1
"test2",1713548861704,1


plaintext by file


data,time,diff
"test2\ndata2",1713548861704,1
"test1\ndata1",1713548861704,1


binary


data,time,diff
[116, 101, 115, 116, 50, 10, 100, 97, 116, 97, 50, 10],1713548861704,1
[116, 101, 115, 116, 49, 10, 100, 97, 116, 97, 49, 10],1713548861704,1
connectorfile systemcsvjson