pw.io.mysql

This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.

Most internal Pathway types can be stored in MySQL. The table below explains how Pathway engine data is serialized. Note that you can convert unsupported types to supported ones yourself (for example, serialize an array to JSON) using user-defined functions if needed.

Pathway types conversion into MySQL

Pathway typeMySQL type
boolTINYINT
intBIGINT. If the field type corresponds to a different integral type, the connector will also attempt to cast the value accordingly.
floatDOUBLE. If the field type is FLOAT, the connector will also attempt to cast the value accordingly.
pointerTEXT. If the type is VARCHAR, the connector will cast the value accordingly.
strTEXT. If the type is VARCHAR, the connector will cast the value accordingly.
bytesBLOB
Naive DateTimeDATETIME(6), serialized with microsecond precision.
UTC DateTimeDATETIME(6), serialized with microsecond precision. The value is casted into the UTC time zone.
DurationTIME(6), serialized with a microsecond precision.
JSONJSON
np.ndarrayNot supported, as there are no array types in MySQL.
tupleNot supported, as there are no array types in MySQL.
listNot supported, as there are no array types in MySQL.
pw.PyObjectWrapperBLOB

write(table, connection_string, table_name, *, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)

sourceWrites table to a MySQL table.

The connector works in two modes: snapshot mode and stream of changes. In snapshot mode, the table maintains the current snapshot of the data. In stream of changes mode, the table contains the log of all data updates. For stream of changes you also need to have two columns, time and diff of the integer type, where time stores the transactional minibatch time and diff is 1 for row insertion or -1 for row deletion.

  • Parameters
    • table (Table) – Table to be written.
    • connection_string (str) – Connection string for MySQL database.
    • table_name (str) – Name of the target table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – "default": The default initialization mode; "create_if_not_exists": initializes the SQL writer by creating the necessary table if they do not already exist; "replace": Initializes the SQL writer by replacing any existing table.
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – Defines how the output table manages its data. If set to "stream_of_changes" (the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion). If set to "snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible.
    • primary_key (list[ColumnReference] | None) – When using snapshot mode, one or more columns that form the primary key in the target MySQL table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

To test this connector locally, you will need a MySQL instance. The easiest way to do this is to run a Docker image. For example, you can use the mysql image. Pull and run it as follows:

docker pull mysql
docker run             --name mysql-container             -e MYSQL_ROOT_PASSWORD=rootpass             -e MYSQL_DATABASE=testdb             -e MYSQL_USER=testuser             -e MYSQL_PASSWORD=testpass             -p 3306:3306             mysql:8.0

The first command pulls the image from the Docker repository while the second runs it, setting the credentials for the user and the database name.

The database is now created and you can use the connector to write data to a table. First, you will need a Pathway table for testing. You can create it as follows:

import pathway as pw
table = pw.debug.table_from_markdown('''
   key | value
     1 | Hello
     2 | World
''')

You can write this table using:

pw.io.mysql.write(
    table,
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="test",
    init_mode="create_if_not_exists",
)

The init_mode parameter set to "create_if_not_exists" ensures that the table is created by the framework if it does not already exist.

You can run this pipeline with pw.run().

After the pipeline completes, you can connect to the database from the command line:

docker exec -it mysql-container mysql -u testuser -p

Enter the password set when creating the database, in this example it is testpass.

You can check the data in the table with a simple command:

select * from test;

Note that if you run the code again, it will append data to this table. To overwrite the entire table, use init_mode set to "replace".

Now suppose that the table is dynamic and you need to maintain a snapshot of the data. In this case it makes sense to use snapshot mode. For snapshot mode you need to choose which columns will form the primary key. For this example suppose it is the key column. If the output table is called test_snapshot the code will look as follows:

pw.io.mysql.write(
    table,
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="test_snapshot",
    init_mode="create_if_not_exists",
    output_table_type="snapshot",
    primary_key=[table.key],
)

Note: The table can be created in MySQL by Pathway when init_mode is set to "replace" or when it is set to "create_if_not_exists". However, when creating a table for snapshot mode, Pathway defines the primary key. If you use string or binary objects as a primary key, there is a limitation because these fields cannot serve as a primary key. Instead, you need to use for example a VARCHAR with a length limit depending on your specific use case. Therefore, if you plan to use strings or blobs as primary keys, make sure that the table with the correct schema is created manually in advance.