Using CSV connectors

Pathway provides a pw.io.csv module with connectors to read and write data streams using CSV files.

Comma-separated values (CSV) is one of the most common formats for tables. In a CSV file, each line is a data record, represented by the values of the different fields, separated by commas.

key,recipient,sender
1,Bill H.,Nancy R.
2,Harry P., Hermione  G.
3,Julian S.,Dick F.

In pathway, you can read and write CSV files using pw.io.csv.read and pw.io.csv.write.

⚠️ CSV connectors work both in streaming and static modes. Be careful as the use of connectors differs depending on the chosen mode: see the differences.

Short version

Consider a simple scenario: new CSV files are added into a directory ./input_stream_dir/, each file containing a table with a single column value. Each entry in value is an integer, and you want to compute the sum of these values and store the resulting output stream in an output_stream.csv file. You can do it as follows in Pathway:

realtime_sum.py
import pathway as pw

# We define a schema for the table
# It set all the columns and their types
class InputSchema(pw.Schema):
  value: int

# We use the CSV input connector to connect to the directory.
t = pw.io.csv.read(
  './input_stream_dir/',
  schema=InputSchema,
  mode="streaming"
)

# We compute the sum (this part is independent of the connectors).
t = t.reduce(sum=pw.reducers.sum(t.value))

# We use a CSV output connector to write the results in an output file.
pw.io.csv.write(t, "output_stream.csv")

# We launch the computation.
pw.run()

Input connector

Data stream: Consider a stream in the form of CSV files: each new update is contained in its own CSV file. The CSV connector pw.io.csv.read takes several arguments:

  • path: the path to the folder in which the new CSV files are expected.
  • schema: the schema of the table. It defines the columns' names and their types.
  • csv_settings: settings for the CSV parser.
  • mode: indicate whether the engine has to wait for new CSV files. Possible values are streaming and static. The default value is streaming.

⚠️ Each CSV file should start with a header containing the column names, in the correct order, and separated by commas.

Note that the CSV connector does not recursively read the files in the subdirectories of path.

colA,colB

Usage: to read a CSV stream in a directory ./dir/ with tables colA and colB, you have to do the following:

class InputSchemaTwoColumns(pw.Schema):
  colA: any
  colB: any

t = pw.io.csv.read('./dir/', schema=InputSchemaTwoColumns, mode="streaming")

The resulting Pathway table t will contain all the values contained inside the CSV files, and will be automatically updated whenever a new CSV file is added.

Special case: static mode

CSV connectors can be used for both streaming and static modes. There are two ways to set up the CSV connector to be static:

  • specify a CSV file in the path: t = pw.io.csv.read('./input_static_file.csv', schema=InputSchema),
  • set mode to static: t = pw.io.csv.read('./input_stream_dir/', schema=InputSchema, mode="static"). In both cases, the connector will read all the available data at once when the computation is triggered (by pw.debug.compute_and_print for example).

Be careful when using the input CSV connector to avoid using it in the wrong mode as they are highly similar.

Output connector

The CSV output connector pw.io.csv.write has two parameters: the table you want to output and the address of the CSV file in which the output will be written:

pw.io.csv.write(t, "output_stream.csv")

Every time the table t is updated, the changes will be automatically appended in output_stream.csv.

Special case: static mode

Similarly to its input counterpart, the CSV output connector can be used in the static mode. Nothing changes here, you can still use pw.io.csv.write(t, "output_static_file.csv"). However, the static data will be printed at once in the file, without any further update.

Complete example

Let's go back to our example on how to compute a sum over the values of the columns value inside CSV files. The final version of our project contains two files: realtime_sum.py which processes the stream using Pathway and generating_stream.sh which generates the streams.

Here is realtime_sum.py:

realtime_sum.py
import pathway as pw

class InputSchema(pw.Schema):
  value: int


t = pw.io.csv.read(
  './input_stream_dir/',
  schema=InputSchema,
  mode="streaming"
  )
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.csv.write(t, "output_stream.csv")
pw.run()

Don't forget to run the computation with pw.run(), otherwise nothing will happen. Once pw.run() is called, the computation will be run forever until it gets killed. If you need some reminders on Pathway operations, don't hesitate to take a look at our first-steps guide.

To have a streaming setup, you need to periodically insert new CSV files into a given directory. It can be easily done using a bash script generating_stream.sh which prints every second a new value:

generating_stream.sh
#!/bin/bash
src="./sum_input_data/"
rm -r $src
mkdir -p $src

sleep 10

for LOOP_ID in {1..30}
do
    filename=$src"input_table_$LOOP_ID.csv"
    printf "value\n$LOOP_ID\n" >> $filename
    sleep 1
done

The input connector requires the input CSV directory to exist so the script generating the stream should be launched first. The input connector will connect to the file and update the results every time a new CSV file is added, updating automatically and in real time the output CSV file using the output CSV connector.