Using CSV connectors

Pathway provides connectors to read and write data streams using CSV files.

Comma-separated values (CSV) is one of the most common format for tables. In a CSV file, each line is a data record, represented by the values of the different fields, separated by commas.

key,recipient,sender
1,Bill H.,Nancy R.
2,Harry P., Hermione  G.
3,Julian S.,Dick F.

⚠️ CSV connectors work in both streaming and static mode. Be careful as the use of the connectors differs depending on the chose mode: see the differences.

Short example

Consider a simple scenario: new CSV files are added into a directory ./input_stream_dir/, each file containing a table with a single column value, and we want to compute the sum of these values and store the resulting output stream into a output_stream.csv file. You can do it as follows in Pathway:

import pathway as pw# We use the CSV input connector to connect to the directory.t = pw.csv.read('./input_stream_dir/', ["value"], poll_new_objects=True)# We compute the sum (this part is independent of the connectors).t = t.select(value=pw.apply(lambda x: int(x), t.value))t = t.reduce(sum=pw.reducers.sum(t.value))# We use a CSV output connector to write the results in an output file.pw.csv.write(t, "output_stream.csv")# We launch the computation.pw.run()

Input connector

Data stream: we consider a stream in the form of CSV files: each new update is contained in its own CSV file. The CSV connector pw.csv.read takes several arguments:

  • path: the path to the folder in which the new CSV files are expected.
  • value_columns: the list of the columns of the table.
  • csv_settings: settings for the CSV parser.
  • poll_new_objects: flag to indicate whether the engine has to wait for new CSV filer. It has to be set to True in the streaming mode.

⚠️ Each CSV file should start with a header containing the column names, in the correct order, and separated by commas.

colA,colB

Usage: to read a CSV stream in a directory ./dir/ with tables colA and colB, you have to do the following:

t = pw.csv.read('./dir/', ["colA", "colB"], poll_new_objects=True)

The resulting Pathway table t will contain the all the values contained inside the CSV files, and will be automatically updated whenever a new CSV file is added.

Special case: static mode

CSV connectors can be use for both streaming and static modes. There are two ways to set up the CSV connector to be static:

  • specify a CSV file in the path: t = pw.csv.read('./input_static_file.csv', ["value"]),
  • set poll_new_objects to False: t = pw.csv.read('./input_stream_dir/', ["value"], poll_new_objects=False). In both cases, the connector will read all the available data at once when the computation is triggered (by pw.debug.compute_and_print for example).

Be careful when using the input CSV connector to avoid using it in the wrong mode as they are highly similar.

Output connector

For the output connector, we just need to specify the table we want to output and the address of the CSV file in which the output will be written:

pw.csv.write(t, "output_stream.csv")

Every time the table t is updated, the changes will be automatically appended in output_stream.csv.

Special case: static mode

Similarly to its input counterpart, the CSV output connector can be used in the static mode. Nothing changes here, you can still use pw.csv.write(t, "output_static_file.csv"). However, the static data will be printed at once in the file, without any further update.

Full example

Let's go back to our example on how to compute a sum over the values of the columns value inside CSV files. The final version of our project contains two files: realtime_sum.py which processes the stream using Pathway and generating_stream.sh which generates the streams.

Here is realtime_sum.py:

import pathway as pwt = pw.csv.read('./sum_input_data/', ["value"], poll_new_objects=True)t = t.select(value=pw.apply(lambda x: int(x), t.value))t = t.reduce(sum=pw.reducers.sum(t.value))pw.csv.write(t, "output_stream.csv")pw.run()

Don't forget to run the computation with pw.run(), otherwise nothing will happen. Once pw.run() is called, the computation will be run forever until it got killed. If you need some reminder on Pathway operations, don't hesitate to take a look at our survival guide.

To have a streaming setup, we need to periodically insert new CSV files into a given directory. It can be easily done using a bash script generating_stream.sh which prints every second a new value:

#!/bin/bashsrc="./sum_input_data/"rm -r $srcmkdir -p $srcsleep 10for LOOP_ID in {1..30}do    filename=$src"input_table_$LOOP_ID.csv"    printf "value\n$LOOP_ID\n" >> $filename    sleep 1done

The input connector requires the input CSV directory to exist so the script generating the stream should be launch first. The input connector will connect to the file and update the results every time a new CSV file is added, updating automatically and in real time the output CSV file using the output CSV connector.

Olivier Ruas

Algorithm and Data Processing Magician