pw.io.mysql

This module is available when using one of the following licenses only: Pathway Live Data Framework Scale, Pathway Live Data Framework Enterprise.

The Pathway Live Data Framework provides both Input and Output connectors for MySQL. See the read and write documentation below for the modes, requirements, and options specific to each. The type conversions for both directions are given in the tables below.

Type Conversion (Input Connector)

The table below describes how MySQL column types are parsed into Pathway Live Data Framework values, and what type to declare in your pw.Schema to receive them correctly. Every type produced by the output connector round-trips back through the input connector when the original schema type is specified.

MySQL types parsed by the input connector

MySQL typePathway Live Data Framework schema type and notes
TINYINT(1) / BOOLEANbool
TINYINT / SMALLINT / MEDIUMINT / INT / BIGINT (incl. UNSIGNED)int. May also be declared as float. An UNSIGNED BIGINT value above the signed 64-bit maximum (9223372036854775807) is reported as a per-row conversion error.
DECIMAL / NUMERICfloat — parsed from the textual representation; precision loss is possible beyond ~15 significant digits. A scale-0 column may instead be declared as int.
FLOATfloat
DOUBLEfloat
CHAR / VARCHAR / TINYTEXT / TEXT / MEDIUMTEXT / LONGTEXT / ENUM / SETstr
VARCHAR / TEXTpw.Json — the string is parsed as a JSON literal. This is what the output connector produces for pw.Json columns stored in a non-JSON column.
JSONpw.Json
VARCHAR / TEXTpw.Pointer — the string is decoded as a Pathway Live Data Framework pointer. The string must have been produced by the output connector for a pw.Pointer column.
BINARY / VARBINARY / TINYBLOB / BLOB / MEDIUMBLOB / LONGBLOBbytes
BLOBpw.PyObjectWrapper — the binary payload is deserialized with bincode. The field must have been written by the output connector for a pw.PyObjectWrapper column.
DATEpw.DateTimeNaive — midnight (00:00:00) on the given date.
DATETIME / TIMESTAMPpw.DateTimeNaive
DATETIMEpw.DateTimeUtc — the stored wall-clock value is interpreted as UTC. This matches how the output connector stores pw.DateTimeUtc.
TIMEpw.Duration — MySQL’s TIME range is limited to ±838:59:59, so durations outside that range do not round-trip.
YEARint
Any nullable columnDeclare the field as optional in the schema. It is parsed as None if the MySQL value is NULL; otherwise the value is parsed as type T. If the field is not declared optional and a NULL is received, a per-row conversion error is raised.

Type Conversion (Output Connector)

The table below explains how Pathway Live Data Framework engine data is serialized into MySQL. You can convert unsupported types to supported ones yourself (for example, serialize an array to JSON) using user-defined functions if needed.

Pathway Live Data Framework types conversion into MySQL

Pathway Live Data Framework typeMySQL type
boolBOOLEAN (TINYINT(1))
intBIGINT. If the field type corresponds to a different integral type, the connector will also attempt to cast the value accordingly.
floatDOUBLE. If the field type is FLOAT, the connector will also attempt to cast the value accordingly.
pointerTEXT. If the type is VARCHAR, the connector will cast the value accordingly. Declare the column as pw.Pointer on read to restore the original value.
strTEXT. If the type is VARCHAR, the connector will cast the value accordingly.
bytesBLOB
Naive DateTimeDATETIME(6), serialized with microsecond precision.
UTC DateTimeDATETIME(6), serialized with microsecond precision. The value is casted into the UTC time zone. Declare the column as pw.DateTimeUtc on read to restore the original value.
DurationTIME(6), serialized with a microsecond precision. MySQL’s TIME range is limited to ±838:59:59.
JSONJSON
np.ndarrayNot supported, as there are no array types in MySQL.
tupleNot supported, as there are no array types in MySQL.
listNot supported, as there are no array types in MySQL.
pw.PyObjectWrapperBLOB — serialized with bincode. Declare the column as pw.PyObjectWrapper on read to restore the original value.

read(connection_string, table_name, schema, *, mode='streaming', server_id=None, autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)

sourceReads a table from a MySQL database.

In "static" mode, the connector issues a plain SELECT against the table, emits all rows, and terminates. No special server configuration is required beyond read access to the table.

In "streaming" mode (the default), the connector performs Change Data Capture by reading MySQL’s binary log (binlog). It first reads a snapshot of the table, then continuously tails the binary log, delivering every insert, update, and delete as it happens. This requires the MySQL server to be configured for row-based binary logging:

  • log_bin enabled (start mysqld with --log-bin; on by default in MySQL 8.0+),
  • binlog_format=ROW (the default in MySQL 8.0+),
  • binlog_row_image=FULL (the default),

and the connecting user to hold the REPLICATION SLAVE and REPLICATION CLIENT privileges (GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO <user>).

Primary key: the schema must declare at least one primary-key column via pw.column_definition(primary_key=True). These columns identify each row so the connector can correlate snapshot rows with subsequent binlog inserts, updates, and deletes.

No server-side footprint: unlike PostgreSQL logical replication, reading the MySQL binary log creates no persistent state on the server — there is no replication slot to leave behind. A disconnected or idle reader therefore cannot cause the server’s disk to fill up: binary-log retention is governed solely by the server’s own binlog_expire_logs_seconds / max_binlog_size settings, independently of any reader.

Persistence: when persistence is enabled (by passing a persistence_config to pw.run), the streaming connector saves the binary-log coordinates (file name and position) of the last processed change as its offset. On restart it skips the snapshot and resumes tailing from that position. If the saved binary-log file has since been purged by the server’s normal log expiry — which happens only if Pathway was offline longer than the server’s binary-log retention — the connector raises a clear error at startup asking you to clear the persistence directory and re-snapshot, rather than silently losing data. Choose a binary-log retention long enough to cover your longest expected downtime. Binary-log coordinates are specific to one MySQL server: if you restore the server from a backup or fail over to a different host, clear the persistence directory. Persistence applies to "streaming" mode; in "static" mode every run re-reads the full table.

  • Parameters
    • connection_string (str) – Connection string for the MySQL database. It must include the database name, e.g. "mysql://user:password@localhost:3306/mydb".
    • table_name (str) – Name of the table to read from.
    • schema (type[Schema]) – Schema of the resulting table.
    • mode (Literal['static', 'streaming']) – "streaming" (the default) reads a snapshot and then tails the binary log for live changes; requires row-based binary logging. "static" reads the full table once as a snapshot, then terminates; no binary-log configuration is needed.
    • server_id (int | None) – The replica server id used when registering for the binary-log stream. It must be unique among everything replicating from the source server. If omitted, a random value is chosen on each run; set it explicitly if you run several readers against the same server and want stable identifiers.
    • autocommit_duration_ms (int | None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • max_backlog_size (int | None) – Limit on the number of entries read from the input source and kept in processing at any moment.
    • debug_data (Any) – Static data to use instead of the external source (for testing).
  • Returns
    Table – The table read.

Example:

To test this connector locally, run a MySQL instance with binary logging enabled (it is on by default in the mysql:8.0 image):

docker run --name mysql-container \
    -e MYSQL_ROOT_PASSWORD=rootpass \
    -e MYSQL_DATABASE=testdb \
    -e MYSQL_USER=testuser \
    -e MYSQL_PASSWORD=testpass \
    -p 3306:3306 \
    mysql:8.0

Grant the replication privileges the binary-log reader needs (run once):

GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'testuser'@'%';

For a one-off static read (no binary-log setup required):

import pathway as pw
class MySchema(pw.Schema):
    id: int = pw.column_definition(primary_key=True)
    name: str
    value: float
table = pw.io.mysql.read(  
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="my_table",
    schema=MySchema,
    mode="static",
)

For streaming change data capture from the binary log, simply use the default mode:

table = pw.io.mysql.read(  
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="my_table",
    schema=MySchema,
)

The resulting table can be transformed with the usual Pathway operators and written to any sink. For example, to mirror the table into another MySQL table in real time:

pw.io.mysql.write(  
    table,
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="my_table_copy",
    init_mode="create_if_not_exists",
    output_table_type="snapshot",
    primary_key=[table.id],
)
pw.run()

To enable persistence so a restarted pipeline resumes from where it left off, pass a persistence_config to pw.run:

persistence_config = pw.persistence.Config(  
    backend=pw.persistence.Backend.filesystem("./PStorage")
)
table = pw.io.mysql.read(  
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="my_table",
    schema=MySchema,
    name="my_mysql_source",
)
pw.io.jsonlines.write(table, "output.jsonl")  
pw.run(persistence_config=persistence_config)

write(table, connection_string, table_name, *, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)

sourceWrites table to a MySQL table.

The connector works in two modes: snapshot mode and stream of changes. In snapshot mode, the table maintains the current snapshot of the data. In stream of changes mode, the table contains the log of all data updates. For stream of changes you also need to have two columns, time and diff of the integer type, where time stores the transactional minibatch time and diff is 1 for row insertion or -1 for row deletion.

  • Parameters
    • table (Table) – Table to be written.
    • connection_string (str) – Connection string for MySQL database.
    • table_name (str) – Name of the target table.
    • max_batch_size (int | None) – Maximum number of entries allowed to be committed within a single transaction.
    • init_mode (Literal['default', 'create_if_not_exists', 'replace']) – "default": The default initialization mode; "create_if_not_exists": initializes the SQL writer by creating the necessary table if they do not already exist; "replace": Initializes the SQL writer by replacing any existing table.
    • output_table_type (Literal['stream_of_changes', 'snapshot']) – Defines how the output table manages its data. If set to "stream_of_changes" (the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns: time, representing the computation minibatch, and diff, indicating the type of change (1 for row addition and -1 for row deletion). If set to "snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible.
    • primary_key (list[ColumnReference] | None) – When using snapshot mode, one or more columns that form the primary key in the target MySQL table.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

To test this connector locally, you will need a MySQL instance. The easiest way to do this is to run a Docker image. For example, you can use the mysql image. Pull and run it as follows:

docker pull mysql
docker run             --name mysql-container             -e MYSQL_ROOT_PASSWORD=rootpass             -e MYSQL_DATABASE=testdb             -e MYSQL_USER=testuser             -e MYSQL_PASSWORD=testpass             -p 3306:3306             mysql:8.0

The first command pulls the image from the Docker repository while the second runs it, setting the credentials for the user and the database name.

The database is now created and you can use the connector to write data to a table. First, you will need a Pathway Live Data Framework table for testing. You can create it as follows:

import pathway as pw
table = pw.debug.table_from_markdown('''
   key | value
     1 | Hello
     2 | World
''')

You can write this table using:

pw.io.mysql.write(
    table,
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="test",
    init_mode="create_if_not_exists",
)

The init_mode parameter set to "create_if_not_exists" ensures that the table is created by the framework if it does not already exist.

You can run this pipeline with pw.run().

After the pipeline completes, you can connect to the database from the command line:

docker exec -it mysql-container mysql -u testuser -p

Enter the password set when creating the database, in this example it is testpass.

You can check the data in the table with a simple command:

select * from test;

Note that if you run the code again, it will append data to this table. To overwrite the entire table, use init_mode set to "replace".

Now suppose that the table is dynamic and you need to maintain a snapshot of the data. In this case it makes sense to use snapshot mode. For snapshot mode you need to choose which columns will form the primary key. For this example suppose it is the key column. If the output table is called test_snapshot the code will look as follows:

pw.io.mysql.write(
    table,
    "mysql://testuser:testpass@localhost:3306/testdb",
    table_name="test_snapshot",
    init_mode="create_if_not_exists",
    output_table_type="snapshot",
    primary_key=[table.key],
)

Note: The table can be created in MySQL by the Pathway Live Data Framework when init_mode is set to "replace" or when it is set to "create_if_not_exists". However, when creating a table for snapshot mode, the Pathway Live Data Framework defines the primary key. If you use string or binary objects as a primary key, there is a limitation because these fields cannot serve as a primary key. Instead, you need to use for example a VARCHAR with a length limit depending on your specific use case. Therefore, if you plan to use strings or blobs as primary keys, make sure that the table with the correct schema is created manually in advance.