The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document describes how to setup the MySQL CDC connector to run SQL queries against MySQL databases.
In order to setup the MySQL CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Download [flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar]( and put it under `<FLINK_HOME>/lib/`.
Every MySQL database client for reading binlog should have an unique id, called server id. MySQL server will use this id to maintain network connection and the binlog position. Therefore, if different jobs share a same server id, it may result to read from wrong binlog position.
Thus, it is recommended to set different server id for each reader via the [SQL Hints](,
e.g. assuming the source parallelism is 4, then we can use `SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;` to assign unique server id for each of the 4 source readers.
When an initial consistent snapshot is made for large databases, your established connection could timeout while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file.
-`interactive_timeout`: The number of seconds the server waits for activity on an interactive connection before closing it. See [MySQL documentations](
-`wait_timeout`: The number of seconds the server waits for activity on a noninteractive connection before closing it. See [MySQL documentations](
<td>Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.</td>
<td>Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.</td>
During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.</td>
See more about the <ahref="">Debezium's MySQL Connector properties</a></td>
<td>Name of the database that contain the row.</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the binlog, the value is always 0.</td>
Incremental snapshot reading is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including:
* (1) MySQL CDC Source can be parallel during snapshot reading
* (2) MySQL CDC Source can perform checkpoints in the chunk granularity during snapshot reading
* (3) MySQL CDC Source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading
If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400',
and the range must be larger than the parallelism.
During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by primary key of table,
and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.
#### Controlling Parallelism
Incremental snapshot reading provides the ability to read snapshot data parallelly.
You can control the source parallelism by setting the job parallelism `parallelism.default`. For example, in SQL CLI:
Flink SQL> SET 'parallelism.default' = 8;
#### Checkpoint
Incremental snapshot reading provides the ability to perform checkpoint in chunk level. It resolves the checkpoint timeout problem in previous version with old snapshot reading mechanism.
#### Lock-free
The MySQL CDC source use **incremental snapshot algorithm**, which avoid acquiring global read lock (FLUSH TABLES WITH READ LOCK) and thus doesn't need `RELOAD` permission.
The ```mysql-cdc``` connector offers high availability of MySQL high available cluster by using the [GTID]( information. To obtain the high availability, the MySQL cluster need enable the GTID mode, the GTID mode in your mysql config file should contain following settings:
gtid_mode = on
enforce_gtid_consistency = on
If the monitored MySQL server address contains slave instance, you need set following settings to the MySQL conf file. The setting ```log-slave-updates = 1``` enables the slave instance to also write the data that synchronized from master to its binlog, this makes sure that the ```mysql-cdc``` connector can consume entire data from the slave instance.
gtid_mode = on
enforce_gtid_consistency = on
log-slave-updates = 1
After the server you monitored fails in MySQL cluster, you only need to change the monitored server address to other available server and then restart the job from the latest checkpoint/savepoint, the job will restore from the checkpoint/savepoint and won't miss any records.
It's recommended to configure a DNS(Domain Name Service) or VIP(Virtual IP Address) for your MySQL cluster, using the DNS or VIP address for ```mysql-cdc``` connector, the DNS or VIP would automatically route the network request to the active MySQL server. In this way, you don't need to modify the address and restart your pipeline anymore.
When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.
In snapshot phase, the snapshot is cut into multiple snapshot chunks according to primary key of table and the size of table rows.
Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with [chunk reading algorithm](#snapshot-chunk-reading) and send the read data to downstream.
The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level.
If a failure happens, the source can be restored and continue to read chunks from last finished chunks.
After all snapshot chunks finished, the source will continue to read binlog in a single task.
In order to guarantee the global data order of snapshot records and binlog records, binlog reader will start to read data
until there is a complete checkpoint after snapshot chunks finished to make sure all snapshot data has been consumed by downstream.
The binlog reader tracks the consumed binlog position in state, thus source of binlog phase can support checkpoint in row level.
Flink performs checkpoints for the source periodically, in case of failover, the job will restart and restore from the last successful checkpoint state and guarantees the exactly once semantic.
##### Snapshot Chunk Splitting
When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
If there is no primary key in the table, incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length.
For example, if you had a table with a primary key column of `id` which is auto-incremental BIGINT type, the minimum value was `0` and maximum value was `100`,
and the table option `scan.incremental.snapshot.chunk.size` value is `25`, the table would be split into following chunks:
(-∞, 25),
[25, 50),
[50, 75),
[75, 100),
[100, +∞)
For other primary key column type, MySQL CDC Source executes the statement in the form of `SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25)` to get the low and high value for each chunk,
the splitting chunks set would be like:
(-∞, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +∞).
##### Chunk Reading Algorithm
For above example `MyTable`, if the MySQL CDC Source parallelism was set to 4, MySQL CDC Source would run 4 readers which each executes **Offset Signal Algorithm** to
get a final consistent output of the snapshot chunk. The **Offset Signal Algorithm** simply describes as following:
* (1) Record current binlog position as `LOW` offset
* (2) Read and buffer the snapshot chunk records by executing statement `SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high`
* (3) Record current binlog position as `HIGH` offset
* (4) Read the binlog records that belong to the snapshot chunk from `LOW` offset to `HIGH` offset
* (5) Upsert the read binlog records into the buffered chunk records, and emit all records in the buffer as final output (all as INSERT records) of the snapshot chunk
* (6) Continue to read and emit binlog records belong to the chunk after the `HIGH` offset in *single binlog reader*.
The algorithm is inspired by [DBLog Paper](, please refer it for more detail.
**Note:** If the actual values for the primary key are not uniformly distributed across its range, this may lead to unbalanced tasks when incremental snapshot read.
The config option `scan.startup.mode` specifies the startup mode for MySQL CDC consumer. The valid enumerations are:
-`initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
-`latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from
the end of the binlog which means only have the changes since the connector was started.
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not using them together. If you speicifying both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it may make `scan.startup.mode` doesn't work._
The `table-name` option supports regular expressions to monitor multiple tables matches the regular expression. So you can set `table-name` to `user_.*` to monitor all the `user_` prefix tables. The same to the `database-name` option. Note that the shared table should be in the same schema.
#### Q3: ConnectException: Received DML '...' for processing, binlog probably contains events generated with statement or mixed based replication format
If there is above exception, please check `binlog_format` is `ROW`, you can check this by running `show variables like '%binlog_format%'` in MySQL client. Please note that even if the `binlog_format` configuration of your database is `ROW`, this configuration can be changed by other sessions, for example, `SET SESSION binlog_format='MIXED'; SET SESSION tx_isolation='REPEATABLE-READ'; COMMIT;`. Please also make sure there are no other session are changing this configuration.
#### Q4: Mysql8.0 Public Key Retrieval is not allowed ?
This is because the MySQL user account uses `sha256_password` authentication which requires transporting password under protection like TLS protocol. A simple way is to enable the MySQL user account use naive password.
-- MySQL
ALTER USER 'username'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
#### Q5: How to config `tableList` option when build MySQL CDC source in DataStream API?
The `tableList` option requires table name with database name rather than table name in DataStream API. For MySQL CDC source, the `tableList` option value should like 'my_db.my_table'.
#### Q6: How to config MySQL server timezone in DataStream API?
The timezone of MySQL server influences the data value of TIMESTAMP column in its binlog file, thus we need to consider the MySQL server timezone when deal record that contains TIMESTAMP column. You can refer `com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema` as an example when you define your custom deserializer to deal binlog data correctly.