pw.io.mysql
This module is available when using one of the following licenses only: Pathway Live Data Framework Scale, Pathway Live Data Framework Enterprise.
The Pathway Live Data Framework provides both Input and Output connectors for MySQL. See the
read and write documentation below for the modes, requirements, and
options specific to each. The type conversions for both directions are given in
the tables below.
Type Conversion (Input Connector)
The table below describes how MySQL column types are parsed into Pathway Live Data Framework values,
and what type to declare in your pw.Schema to receive them correctly. Every
type produced by the output connector round-trips back through the input
connector when the original schema type is specified.
MySQL types parsed by the input connector
| MySQL type | Pathway Live Data Framework schema type and notes |
|---|---|
TINYINT(1) / BOOLEAN | bool |
TINYINT / SMALLINT / MEDIUMINT / INT / BIGINT (incl. UNSIGNED) | int. May also be declared as float. An UNSIGNED BIGINT value above the signed 64-bit maximum (9223372036854775807) is reported as a per-row conversion error. |
DECIMAL / NUMERIC | float — parsed from the textual representation; precision loss is possible beyond ~15 significant digits. A scale-0 column may instead be declared as int. |
FLOAT | float |
DOUBLE | float |
CHAR / VARCHAR / TINYTEXT / TEXT / MEDIUMTEXT / LONGTEXT / ENUM / SET | str |
VARCHAR / TEXT | pw.Json — the string is parsed as a JSON literal. This is what the output connector produces for pw.Json columns stored in a non-JSON column. |
JSON | pw.Json |
VARCHAR / TEXT | pw.Pointer — the string is decoded as a Pathway Live Data Framework pointer. The string must have been produced by the output connector for a pw.Pointer column. |
BINARY / VARBINARY / TINYBLOB / BLOB / MEDIUMBLOB / LONGBLOB | bytes |
BLOB | pw.PyObjectWrapper — the binary payload is deserialized with bincode. The field must have been written by the output connector for a pw.PyObjectWrapper column. |
DATE | pw.DateTimeNaive — midnight (00:00:00) on the given date. |
DATETIME / TIMESTAMP | pw.DateTimeNaive |
DATETIME | pw.DateTimeUtc — the stored wall-clock value is interpreted as UTC. This matches how the output connector stores pw.DateTimeUtc. |
TIME | pw.Duration — MySQL’s TIME range is limited to ±838:59:59, so durations outside that range do not round-trip. |
YEAR | int |
| Any nullable column | Declare the field as optional in the schema. It is parsed as None if the MySQL value is NULL; otherwise the value is parsed as type T. If the field is not declared optional and a NULL is received, a per-row conversion error is raised. |
Type Conversion (Output Connector)
The table below explains how Pathway Live Data Framework engine data is serialized into MySQL. You can convert unsupported types to supported ones yourself (for example, serialize an array to JSON) using user-defined functions if needed.
Pathway Live Data Framework types conversion into MySQL
| Pathway Live Data Framework type | MySQL type |
|---|---|
bool | BOOLEAN (TINYINT(1)) |
int | BIGINT. If the field type corresponds to a different integral type, the connector will also attempt to cast the value accordingly. |
float | DOUBLE. If the field type is FLOAT, the connector will also attempt to cast the value accordingly. |
pointer | TEXT. If the type is VARCHAR, the connector will cast the value accordingly. Declare the column as pw.Pointer on read to restore the original value. |
str | TEXT. If the type is VARCHAR, the connector will cast the value accordingly. |
bytes | BLOB |
Naive DateTime | DATETIME(6), serialized with microsecond precision. |
UTC DateTime | DATETIME(6), serialized with microsecond precision. The value is casted into the UTC time zone. Declare the column as pw.DateTimeUtc on read to restore the original value. |
Duration | TIME(6), serialized with a microsecond precision. MySQL’s TIME range is limited to ±838:59:59. |
JSON | JSON |
np.ndarray | Not supported, as there are no array types in MySQL. |
tuple | Not supported, as there are no array types in MySQL. |
list | Not supported, as there are no array types in MySQL. |
pw.PyObjectWrapper | BLOB — serialized with bincode. Declare the column as pw.PyObjectWrapper on read to restore the original value. |
read(connection_string, table_name, schema, *, mode='streaming', server_id=None, autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)
sourceReads a table from a MySQL database.
In "static" mode, the connector issues a plain SELECT against the
table, emits all rows, and terminates. No special server configuration is
required beyond read access to the table.
In "streaming" mode (the default), the connector performs Change Data
Capture by reading MySQL’s binary log (binlog). It first reads a snapshot
of the table, then continuously tails the binary log, delivering every
insert, update, and delete as it happens. This requires the MySQL server to
be configured for row-based binary logging:
log_binenabled (startmysqldwith--log-bin; on by default in MySQL 8.0+),binlog_format=ROW(the default in MySQL 8.0+),binlog_row_image=FULL(the default),
and the connecting user to hold the REPLICATION SLAVE and
REPLICATION CLIENT privileges
(GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO <user>).
Primary key: the schema must declare at least one primary-key column via
pw.column_definition(primary_key=True). These columns identify each row
so the connector can correlate snapshot rows with subsequent binlog inserts,
updates, and deletes.
No server-side footprint: unlike PostgreSQL logical replication, reading
the MySQL binary log creates no persistent state on the server — there is
no replication slot to leave behind. A disconnected or idle reader therefore
cannot cause the server’s disk to fill up: binary-log retention is governed
solely by the server’s own binlog_expire_logs_seconds /
max_binlog_size settings, independently of any reader.
Persistence: when persistence is enabled (by passing a
persistence_config to pw.run), the streaming connector saves the
binary-log coordinates (file name and position) of the last processed change
as its offset. On restart it skips the snapshot and resumes tailing from that
position. If the saved binary-log file has since been purged by the server’s
normal log expiry — which happens only if Pathway was offline longer than the
server’s binary-log retention — the connector raises a clear error at
startup asking you to clear the persistence directory and re-snapshot, rather
than silently losing data. Choose a binary-log retention long enough to cover
your longest expected downtime. Binary-log coordinates are specific to one
MySQL server: if you restore the server from a backup or fail over to a
different host, clear the persistence directory. Persistence applies to
"streaming" mode; in "static" mode every run re-reads the full table.
- Parameters
- connection_string (
str) – Connection string for the MySQL database. It must include the database name, e.g."mysql://user:password@localhost:3306/mydb". - table_name (
str) – Name of the table to read from. - schema (
type[Schema]) – Schema of the resulting table. - mode (
Literal['static','streaming']) –"streaming"(the default) reads a snapshot and then tails the binary log for live changes; requires row-based binary logging."static"reads the full table once as a snapshot, then terminates; no binary-log configuration is needed. - server_id (
int|None) – The replica server id used when registering for the binary-log stream. It must be unique among everything replicating from the source server. If omitted, a random value is chosen on each run; set it explicitly if you run several readers against the same server and want stable identifiers. - autocommit_duration_ms (
int|None) – The maximum time between two commits. Everyautocommit_duration_msmilliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. - debug_data (
Any) – Static data to use instead of the external source (for testing).
- connection_string (
- Returns
Table – The table read.
Example:
To test this connector locally, run a MySQL instance with binary logging
enabled (it is on by default in the mysql:8.0 image):
docker run --name mysql-container \
-e MYSQL_ROOT_PASSWORD=rootpass \
-e MYSQL_DATABASE=testdb \
-e MYSQL_USER=testuser \
-e MYSQL_PASSWORD=testpass \
-p 3306:3306 \
mysql:8.0
Grant the replication privileges the binary-log reader needs (run once):
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'testuser'@'%';
For a one-off static read (no binary-log setup required):
import pathway as pw
class MySchema(pw.Schema):
id: int = pw.column_definition(primary_key=True)
name: str
value: float
table = pw.io.mysql.read(
"mysql://testuser:testpass@localhost:3306/testdb",
table_name="my_table",
schema=MySchema,
mode="static",
)
For streaming change data capture from the binary log, simply use the default mode:
table = pw.io.mysql.read(
"mysql://testuser:testpass@localhost:3306/testdb",
table_name="my_table",
schema=MySchema,
)
The resulting table can be transformed with the usual Pathway operators and written to any sink. For example, to mirror the table into another MySQL table in real time:
pw.io.mysql.write(
table,
"mysql://testuser:testpass@localhost:3306/testdb",
table_name="my_table_copy",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[table.id],
)
pw.run()
To enable persistence so a restarted pipeline resumes from where it left off,
pass a persistence_config to pw.run:
persistence_config = pw.persistence.Config(
backend=pw.persistence.Backend.filesystem("./PStorage")
)
table = pw.io.mysql.read(
"mysql://testuser:testpass@localhost:3306/testdb",
table_name="my_table",
schema=MySchema,
name="my_mysql_source",
)
pw.io.jsonlines.write(table, "output.jsonl")
pw.run(persistence_config=persistence_config)
write(table, connection_string, table_name, *, max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)
sourceWrites table to a MySQL table.
The connector works in two modes: snapshot mode and stream of changes.
In snapshot mode, the table maintains the current snapshot of the data.
In stream of changes mode, the table contains the log of all data updates. For
stream of changes you also need to have two columns, time and diff of the
integer type, where time stores the transactional minibatch time and diff
is 1 for row insertion or -1 for row deletion.
- Parameters
- table (
Table) – Table to be written. - connection_string (
str) – Connection string for MySQL database. - table_name (
str) – Name of the target table. - max_batch_size (
int|None) – Maximum number of entries allowed to be committed within a single transaction. - init_mode (
Literal['default','create_if_not_exists','replace']) –"default": The default initialization mode;"create_if_not_exists": initializes the SQL writer by creating the necessary table if they do not already exist;"replace": Initializes the SQL writer by replacing any existing table. - output_table_type (
Literal['stream_of_changes','snapshot']) – Defines how the output table manages its data. If set to"stream_of_changes"(the default), the system outputs a stream of modifications to the target table. This stream includes two additional integer columns:time, representing the computation minibatch, anddiff, indicating the type of change (1for row addition and-1for row deletion). If set to"snapshot", the table maintains the current state of the data, updated atomically with each minibatch and ensuring that no partial minibatch updates are visible. - primary_key (
list[ColumnReference] |None) – When using snapshot mode, one or more columns that form the primary key in the target MySQL table. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
To test this connector locally, you will need a MySQL instance. The easiest way to do this is to run a Docker image. For example, you can use the mysql image. Pull and run it as follows:
docker pull mysql
docker run --name mysql-container -e MYSQL_ROOT_PASSWORD=rootpass -e MYSQL_DATABASE=testdb -e MYSQL_USER=testuser -e MYSQL_PASSWORD=testpass -p 3306:3306 mysql:8.0
The first command pulls the image from the Docker repository while the second runs it, setting the credentials for the user and the database name.
The database is now created and you can use the connector to write data to a table. First, you will need a Pathway Live Data Framework table for testing. You can create it as follows:
import pathway as pw
table = pw.debug.table_from_markdown('''
key | value
1 | Hello
2 | World
''')
You can write this table using:
pw.io.mysql.write(
table,
"mysql://testuser:testpass@localhost:3306/testdb",
table_name="test",
init_mode="create_if_not_exists",
)
The init_mode parameter set to "create_if_not_exists" ensures that the table
is created by the framework if it does not already exist.
You can run this pipeline with pw.run().
After the pipeline completes, you can connect to the database from the command line:
docker exec -it mysql-container mysql -u testuser -p
Enter the password set when creating the database, in this example it is testpass.
You can check the data in the table with a simple command:
select * from test;
Note that if you run the code again, it will append data to this table. To overwrite
the entire table, use init_mode set to "replace".
Now suppose that the table is dynamic and you need to maintain a snapshot of the data.
In this case it makes sense to use snapshot mode. For snapshot mode you need to choose
which columns will form the primary key. For this example suppose it is the key
column. If the output table is called test_snapshot the code will look as follows:
pw.io.mysql.write(
table,
"mysql://testuser:testpass@localhost:3306/testdb",
table_name="test_snapshot",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[table.key],
)
Note: The table can be created in MySQL by the Pathway Live Data Framework when init_mode is set to
"replace" or when it is set to "create_if_not_exists". However, when creating
a table for snapshot mode, the Pathway Live Data Framework defines the primary key. If you use string or binary
objects as a primary key, there is a limitation because these fields cannot serve as
a primary key. Instead, you need to use for example a VARCHAR with a length limit
depending on your specific use case. Therefore, if you plan to use strings or
blobs as primary keys, make sure that the table with the correct schema is created
manually in advance.