diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md
index 3c900a4ce..95a2be527 100644
--- a/docs/content/connectors/mysql-cdc.md
+++ b/docs/content/connectors/mysql-cdc.md
@@ -35,8 +35,9 @@ mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
2. Grant the required permissions to the user:
```sql
-mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
+mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
```
+**Note:** The RELOAD permissions is not required any more when `scan.incremental.snapshot.enabled` is enabled (enabled by default).
3. Finalize the user’s permissions:
@@ -50,31 +51,12 @@ See more about the [permission explanation](https://debezium.io/documentation/re
Notes
----------------
-### How MySQL CDC source works
+### Set a different SERVER ID for each reader
-When the MySQL CDC source is started, it grabs a global read lock (`FLUSH TABLES WITH READ LOCK`) that blocks writes by other database. Then it reads current binlog position and the schema of the databases and tables. After that, the global read lock is released. Then it scans the database tables and reads binlog from the previous recorded position. Flink will periodically perform checkpoints to record the binlog position. In case of failover, the job will restart and restore from the checkpointed binlog position. Consequently, it guarantees the exactly once semantic.
+Every MySQL database client for reading binlog should have an unique id, called server id. MySQL server will use this id to maintain network connection and the binlog position. Therefore, if different jobs share a same server id, it may result to read from wrong binlog position.
+Thus, it is recommended to set different server id for each reader via the [SQL Hints](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints.html),
+e.g. assuming the source parallelism is 4, then we can use `SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;` to assign unique server id for each of the 4 source readers.
-### Grant RELOAD permission for MySQL user
-If the MySQL user is not granted the RELOAD permission, the MySQL CDC source will use table-level locks instead and performs a snapshot with this method. This blocks write for a longer time.
-
-### The global read lock (FLUSH TABLES WITH READ LOCK)
-The global read lock is hold during reading binlog position and schema. It may takes seconds which is depending on the numbers of tables. The global read lock blocks writes, so it may still affect online business. If you want to skip the read lock, and can tolerate at-least-once semantic, you can add `'debezium.snapshot.locking.mode' = 'none'` option to skip the lock.
-
-### Set a differnet SERVER ID for each job
-Every MySQL database client for reading binlog should have an unique id, called server id. MySQL server will use this id to maintain network connection and the binlog position. Therefore, if different jobs share a same server id, it may result to reading from wrong binlog position.
-Thus, it is recommended to set different server id for each job via the [SQL Hints](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints.html), e.g. `SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;`.
-
-Tip: By default the server id is randomed when the TaskManager is started up. If TaskManager is failed down, it may have a different server id when starting up again. But this shouldn't happen frequently (job exception doesn't restart TaskManager), and shouldn't affect MySQL server too much.
-
-### Can't perform checkpoint during scaning database tables
-During scanning tables, since there is no recoverable position, we can't perform checkpoints. In order to not perform checkpoints, MySQL CDC source will keep the checkpoint waiting to timeout. The timemout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:
-
-```
-execution.checkpointing.interval: 10min
-execution.checkpointing.tolerable-failed-checkpoints: 100
-restart-strategy: fixed-delay
-restart-strategy.fixed-delay.attempts: 2147483647
-```
### Setting up MySQL session timeouts
@@ -83,7 +65,6 @@ When an initial consistent snapshot is made for large databases, your establishe
- `wait_timeout`: The number of seconds the server waits for activity on a noninteractive connection before closing it. See [MySQL documentations](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_wait_timeout).
-
How to create a MySQL CDC table
----------------
@@ -105,8 +86,7 @@ CREATE TABLE orders (
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
- 'table-name' = 'orders'
-);
+ 'table-name' = 'orders');
-- read snapshot and binlogs from orders table
SELECT * FROM orders;
@@ -181,10 +161,42 @@ Connector Options
optional |
(none) |
Integer |
- A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster.
- This connector joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog.
- By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. |
-
+ A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400',
+ the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled.
+ Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster
+ as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400,
+ though we recommend setting an explicit value.
+ |
+
+
+ scan.incremental.snapshot.enabled |
+ optional |
+ true |
+ Boolean |
+ Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism,
+ the incremental snapshot has many advantages, including:
+ (1) source can be parallel during snapshot reading,
+ (2) source can perform checkpoints in the chunk granularity during snapshot reading,
+ (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading.
+ If you would like the source run in parallel, each parallel reader should have an unique server id, so
+ the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism.
+ Please see Incremental Snapshot Readingsection for more detailed information.
+ |
+
+
+ scan.incremental.snapshot.chunk.size |
+ optional |
+ 8096 |
+ Integer |
+ The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. |
+
+
+ scan.snapshot.fetch.size |
+ optional |
+ 1024 |
+ Integer |
+ The maximum fetch size for per poll when read table snapshot. |
+
scan.startup.mode |
optional |
@@ -213,13 +225,12 @@ Connector Options
During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.
- debezium.snapshot.
- fetch.size |
- optional |
- (none) |
- Integer |
- Specifies the maximum number of rows that should be read in one go from each table while taking a snapshot. The connector will read the table contents in multiple batches of this size. |
-
+ connect.timeout |
+ optional |
+ 30s |
+ Duration |
+ The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. |
+
debezium.* |
optional |
@@ -236,9 +247,101 @@ During a snapshot operation, the connector will query each included table to pro
Features
--------
+### Incremental Snapshot Reading
+
+Incremental snapshot reading is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including:
+* (1) MySQL CDC Source can be parallel during snapshot reading
+* (2) MySQL CDC Source can perform checkpoints in the chunk granularity during snapshot reading
+* (3) MySQL CDC Source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading
+
+If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400',
+and the range must be larger than the parallelism.
+
+During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by primary key of table,
+and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.
+
+#### Controlling Parallelism
+
+Incremental snapshot reading provides the ability to read snapshot data parallelly.
+You can control the source parallelism by setting the job parallelism `parallelism.default`. For example, in SQL CLI:
+
+```sql
+Flink SQL> SET 'parallelism.default' = 8;
+```
+
+#### Checkpoint
+
+Incremental snapshot reading provides the ability to perform checkpoint in chunk level. It resolves the checkpoint timeout problem in previous version with old snapshot reading mechanism.
+
+#### Lock-free
+
+The MySQL CDC source use **incremental snapshot algorithm**, which avoid acquiring global read lock (FLUSH TABLES WITH READ LOCK) and thus doesn't need `RELOAD` permission.
+
+#### How Incremental Snapshot Reading works
+
+When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.
+
+In snapshot phase, the snapshot is cut into multiple snapshot chunks according to primary key of table and the size of table rows.
+Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with [chunk reading algorithm](#snapshot-chunk-reading) and send the read data to downstream.
+The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level.
+If a failure happens, the source can be restored and continue to read chunks from last finished chunks.
+
+After all snapshot chunks finished, the source will continue to read binlog in a single task.
+In order to guarantee the global data order of snapshot records and binlog records, binlog reader will start to read data
+until there is a complete checkpoint after snapshot chunks finished to make sure all snapshot data has been consumed by downstream.
+The binlog reader tracks the consumed binlog position in state, thus source of binlog phase can support checkpoint in row level.
+
+Flink performs checkpoints for the source periodically, in case of failover, the job will restart and restore from the last successful checkpoint state and guarantees the exactly once semantic.
+
+##### Snapshot Chunk Splitting
+
+When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
+MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
+If there is no primary key in the table, incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
+
+For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length.
+For example, if you had a table with a primary key column of `id` which is auto-incremental BIGINT type, the minimum value was `0` and maximum value was `100`,
+and the table option `scan.incremental.snapshot.chunk.size` value is `25`, the table would be split into following chunks:
+
+```
+ (-∞, 25),
+ [25, 50),
+ [50, 75),
+ [75, 100),
+ [100, +∞)
+```
+
+For other primary key column type, MySQL CDC Source executes the statement in the form of `SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25)` to get the low and high value for each chunk,
+the splitting chunks set would be like:
+
+ ```
+ (-∞, 'uuid-001'),
+ ['uuid-001', 'uuid-009'),
+ ['uuid-009', 'uuid-abc'),
+ ['uuid-abc', 'uuid-def'),
+ [uuid-def, +∞).
+```
+
+##### Chunk Reading Algorithm
+
+For above example `MyTable`, if the MySQL CDC Source parallelism was set to 4, MySQL CDC Source would run 4 readers which each executes **Offset Signal Algorithm** to
+get a final consistent output of the snapshot chunk. The **Offset Signal Algorithm** simply describes as following:
+
+ * (1) Record current binlog position as `LOW` offset
+ * (2) Read and buffer the snapshot chunk records by executing statement `SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high`
+ * (3) Record current binlog position as `HIGH` offset
+ * (4) Read the binlog records that belong to the snapshot chunk from `LOW` offset to `HIGH` offset
+ * (5) Upsert the read binlog records into the buffered chunk records, and emit all records in the buffer as final output (all as INSERT records) of the snapshot chunk
+ * (6) Continue to read and emit binlog records belong to the chunk after the `HIGH` offset in *single binlog reader*.
+
+The algorithm is inspired by [DBLog Paper](https://arxiv.org/pdf/2010.12597v1.pdf), please refer it for more detail.
+
+**Note:** If the actual values for the primary key are not uniformly distributed across its range, this may lead to unbalanced tasks when incremental snapshot read.
+
### Exactly-Once Processing
-The MySQL CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with **exactly-once processing** even failures happen. Please read [How the connector performs database snapshot](https://debezium.io/documentation/reference/1.2/connectors/mysql.html#how-the-mysql-connector-performs-database-snapshots_debezium).
+The MySQL CDC connector is a Flink Source connector which will read table snapshot chunks first and then continues to read binlog,
+both snapshot phase and binlog phase, MySQL CDC connector read with **exactly-once processing** even failures happen.
### Startup Reading Position
@@ -250,19 +353,16 @@ the end of the binlog which means only have the changes since the connector was
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not using them together. If you speicifying both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it may make `scan.startup.mode` doesn't work._
-### Single Thread Reading
-
-The MySQL CDC source can't work in parallel reading, because there is only one task can receive binlog events.
### DataStream Source
-The MySQL CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
+The Incremental Snapshot Reading feature of MySQL CDC Source only exposes in SQL currently, if you're using DataStream, please use legacy MySQL Source:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
-import com.ververica.cdc.connectors.mysql.MySQLSource;
+import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
+import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
@@ -286,7 +386,6 @@ public class MySqlBinlogSourceExample {
}
```
-
Data Type Mapping
----------------