pathway.io.csv package


pathway.io.csv.read(path, value_columns, id_columns=None, csv_settings=None, poll_new_objects=False, debug_data=None)

Reads a table from one or several files with delimiter-separated values.

In case the folder is passed to the engine, the order in which files from the directory are processed is determined according to the modification time of files within this folder: they will be processed by ascending order of the modification time.

  • Parameters
    • path (str) – Path to the file or to the folder with files.
    • value_columns (Liststr) – Names of the columns to be extracted from the files.
    • id_columns (OptionalListstr) – In case the table should have primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, primary key will be generated as uuid4.
    • csv_settings (OptionalCsvParserSettings) – Settings for the CSV parser.
    • poll_new_objects (Optionalbool) – If set to true, the engine will wait for the new input files in the directory.
    • debug_data – Static data replacing original one when debug mode is active.
  • Returns
    The table read.
  • Return type
    Table

Example:

Consider we want to read a dataset, stored in the filesystem in a standard CSV format. The dataset contains data about pets and their owners.

For the sake of demonstation, we can prepare a small dataset by creating a CSV file via a unix command line tool:

printf "id,owner,pet\n1,Alice,dog\n2,Bob,dog\n3,Alice,cat\n4,Bob,dog" > dataset.csv

In order to read it into Pathway’s table, we can first do the import and then use the pw.csv.read method:

>>> import pathway as pw>>> t = pw.csv.read("dataset.csv", ["owner", "pet"])

Then, we can output the table in order to check the correctness of the read:

>>> pw.debug.compute_and_print(t, include_id=False)owner petAlice dog  Bob dogAlice cat  Bob dog

Now let’s try something different. Consider we have site access logs stored in a separate folder in several files. For the sake of simplicity, a log entry contains an access ID, an IP address and the login of the user.

A dataset, corresponding to the format described above can be generated, thanks to the following set of unix commands:

mkdir logsprintf "id,ip,login\n1,127.0.0.1,alice\n2,8.8.8.8,alice" > logs/part_1.csvprintf "id,ip,login\n3,8.8.8.8,bob\n4,127.0.0.1,alice" > logs/part_2.csv

Now, let’s see how we can use the connector in order to read the content of this directory into a table:

>>> t = pw.csv.read("logs/", ["ip", "login"])

The only difference is that we specified the name of the directory instead of the file name, as ooposed to what we had done in the previous example. It’s that simple!

But what if we are working with a real-time system, which generates logs all the time. The logs are being written and after a while they get into the log directory (this is also called “logs rotation”). Now, consider that there is a need to fetch the new files from this logs directory all the time. Would Pathway handle that? Sure!

The only difference would be in the usage of poll_new_objects flag. So the code snippet will look as follows:

>>> t = pw.csv.read("logs/", ["ip", "login"], poll_new_objects=True)

With this method, we obtain a table updated dynamically. The changes in the logs would incur changes in the Business-Intelligence ‘BI’-ready data, namely, in the tables we would like to output. To see, how these changes are reported by Pathway, please have a look at the “Stream of Updates and Snapshots” article.


pathway.io.csv.write(table, filename)

Writes table’s stream of updates to a file in delimiter-separated values format.

  • Parameters
    • table (Table) – Table to be written.
    • filename (str) – Path to the target output file.
  • Returns
    None

Example:

In this simple example we can see how table output works. First, we import Pathway and create a table:

>>> import pathway as pw>>> t = pw.debug.parse_to_table("age owner pet \n 1 10 Alice dog \n 2 9 Bob cat \n 3 8 Alice cat")

Consider we would want to output the stream of changes of this table. In order to do that we simply do:

>>> pw.csv.write(t, "table.csv")

Now, let’s see what we have on the output:

cat table.csv
age,owner,pet,time,diff
10,"Alice","dog",0,1
9,"Bob","cat",0,1
8,"Alice","cat",0,1

The first three columns clearly represent the data columns we have. The column time represent the number of operations minibatch, in which each of the rows was read. In this example, since the data is static, it’s simple: we have 0. The diff is another element of this stream of updates. In this context it is 1 because all three rows were read from the input. All in all, the extra information in time and diff columns - in this case - show us that in the initial minibatch (time = 0), we’ve read three rows and all of them were added to the collection (diff = 1).