[3.0][docs] Add documents for MySql pipeline connector (#2827)

pull/2833/head
Jiabao Sun 1 year ago committed by GitHub
parent f2c2e64491
commit 2088f89ed8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -282,7 +282,8 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>scan.cursor.no-timeout</td>

@ -288,7 +288,11 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
<td>Whether to close idle readers at the end of the snapshot phase. <br>
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br>
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.cursor.no-timeout</td>

@ -314,7 +314,7 @@ Flink SQL> SELECT * FROM orders;
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
For example: <code>'debezium.snapshot.mode' = 'never'</code>.
例如: <code>'debezium.snapshot.mode' = 'never'</code>.
查看更多关于 <a href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-connector-properties"> Debezium 的 MySQL 连接器属性</a></td>
</tr>
<tr>
@ -322,7 +322,8 @@ Flink SQL> SELECT * FROM orders;
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
@ -908,7 +909,7 @@ CREATE TABLE products (
<td>
BIT(n)
</td>
<td>BINARY(⌈n/8⌉)</td>
<td>BINARY(⌈(n + 7) / 8⌉)</td>
<td></td>
</tr>
<tr>

@ -327,7 +327,11 @@ During a snapshot operation, the connector will query each included table to pro
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
<td>Whether to close idle readers at the end of the snapshot phase. <br>
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br>
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>debezium.binary.handling.mode</td>
@ -920,7 +924,7 @@ Data Type Mapping
<td>
BIT(n)
</td>
<td>BINARY(⌈n/8⌉)</td>
<td>BINARY(⌈(n + 7) / 8⌉)</td>
<td></td>
</tr>
<tr>

@ -614,7 +614,7 @@ Data Type Mapping
</tr>
<tr>
<td>BIT(n)</td>
<td>BINARY(⌈n/8⌉)</td>
<td>BINARY(⌈(n + 7) / 8⌉)</td>
<td></td>
</tr>
<tr>

@ -362,7 +362,11 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
<td>Whether to close idle readers at the end of the snapshot phase. <br>
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br>
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
</tbody>
</table>

@ -206,6 +206,17 @@ Connector Options
Please see <a href="#incremental-snapshot-reading ">Incremental Snapshot Reading</a>section for more detailed information.
</td>
</tr>
<tr>
<td>scan.incremental.close-idle-reader.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. <br>
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br>
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
</tbody>
</table>
</div>

@ -205,7 +205,11 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
<td>Whether to close idle readers at the end of the snapshot phase. <br>
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br>
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
</tbody>
</table>

@ -0,0 +1,8 @@
# Pipeline Connectors
```{toctree}
:maxdepth: 2
mysql-pipeline
mysql-pipeline(ZH)
```

@ -0,0 +1,540 @@
# MySQL CDC Pipeline 连接器
MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。
本文描述了如何设置 MySQL CDC Pipeline 连接器。
如何创建 Pipeline
----------------
从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
```yaml
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```
Pipeline 连接器选项
----------------
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 10%">Option</th>
<th class="text-left" style="width: 8%">Required</th>
<th class="text-left" style="width: 7%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>hostname</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td> MySQL 数据库服务器的 IP 地址或主机名。</td>
</tr>
<tr>
<td>port</td>
<td>optional</td>
<td style="word-wrap: break-word;">3306</td>
<td>Integer</td>
<td>MySQL 数据库服务器的整数端口号。</td>
</tr>
<tr>
<td>username</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。</td>
</tr>
<tr>
<td>password</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>连接 MySQL 数据库服务器时使用的密码。</td>
</tr>
<tr>
<td>tables</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br>
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
例如db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
</tr>
<tr>
<td>schema-change.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步默认为true。</td>
</tr>
<tr>
<td>server-id</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>读取数据使用的 server idserver id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408',
建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.chunk.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">8096</td>
<td>Integer</td>
<td>表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。</td>
</tr>
<tr>
<td>scan.snapshot.fetch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>读取表快照时每次读取数据的最大条数。</td>
</tr>
<tr>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td> MySQL CDC 消费者可选的启动模式,
合法的模式为 "initial""earliest-offset""latest-offset""specific-offset" 和 "timestamp"。
请查阅 <a href="#a-name-id-002-a">启动模式</a> 章节了解更多详细信息。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.file</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.pos</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.gtid-set</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>在 "specific-offset" 启动模式下,启动位点的 GTID 集合。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-events</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>在指定的启动位点后需要跳过的事件数量。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-rows</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>在指定的启动位点后需要跳过的数据行数量。</td>
</tr>
<tr>
<td>connect.timeout</td>
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。</td>
</tr>
<tr>
<td>connect.max-retries</td>
<td>optional</td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。</td>
</tr>
<tr>
<td>connection.pool.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>Integer</td>
<td>连接池大小。</td>
</tr>
<tr>
<td>jdbc.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>String</td>
<td>传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.</td>
</tr>
<tr>
<td>heartbeat.interval</td>
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。</td>
</tr>
<tr>
<td>debezium.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
例如: <code>'debezium.snapshot.mode' = 'never'</code>.
查看更多关于 <a href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-connector-properties"> Debezium 的 MySQL 连接器属性</a></td>
</tr>
<tr>
<td>scan.incremental.close-idle-reader.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
</tbody>
</table>
</div>
启动模式
--------
配置选项```scan.startup.mode```指定 MySQL CDC 使用者的启动模式。有效枚举包括:
- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
- `earliest-offset`:跳过快照阶段,从可读取的最早 binlog 位点开始读取
- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
数据类型映射
----------------
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width:30%;">MySQL type<a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html"></a></th>
<th class="text-left" style="width:10%;">CDC type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT(n)</td>
<td>TINYINT</td>
<td></td>
</tr>
<tr>
<td>
SMALLINT<br>
TINYINT UNSIGNED<br>
TINYINT UNSIGNED ZEROFILL
</td>
<td>SMALLINT</td>
<td></td>
</tr>
<tr>
<td>
INT<br>
YEAR<br>
MEDIUMINT<br>
MEDIUMINT UNSIGNED<br>
MEDIUMINT UNSIGNED ZEROFILL<br>
SMALLINT UNSIGNED<br>
SMALLINT UNSIGNED ZEROFILL
</td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>
BIGINT<br>
INT UNSIGNED<br>
INT UNSIGNED ZEROFILL
</td>
<td>BIGINT</td>
<td></td>
</tr>
<tr>
<td>
BIGINT UNSIGNED<br>
BIGINT UNSIGNED ZEROFILL<br>
SERIAL
</td>
<td>DECIMAL(20, 0)</td>
<td></td>
</tr>
<tr>
<td>
FLOAT<br>
FLOAT UNSIGNED<br>
FLOAT UNSIGNED ZEROFILL
</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>
REAL<br>
REAL UNSIGNED<br>
REAL UNSIGNED ZEROFILL<br>
DOUBLE<br>
DOUBLE UNSIGNED<br>
DOUBLE UNSIGNED ZEROFILL<br>
DOUBLE PRECISION<br>
DOUBLE PRECISION UNSIGNED<br>
DOUBLE PRECISION UNSIGNED ZEROFILL
</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
NUMERIC(p, s) UNSIGNED<br>
NUMERIC(p, s) UNSIGNED ZEROFILL<br>
DECIMAL(p, s)<br>
DECIMAL(p, s) UNSIGNED<br>
DECIMAL(p, s) UNSIGNED ZEROFILL<br>
FIXED(p, s)<br>
FIXED(p, s) UNSIGNED<br>
FIXED(p, s) UNSIGNED ZEROFILL<br>
where p <= 38<br>
</td>
<td>DECIMAL(p, s)</td>
<td></td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
NUMERIC(p, s) UNSIGNED<br>
NUMERIC(p, s) UNSIGNED ZEROFILL<br>
DECIMAL(p, s)<br>
DECIMAL(p, s) UNSIGNED<br>
DECIMAL(p, s) UNSIGNED ZEROFILL<br>
FIXED(p, s)<br>
FIXED(p, s) UNSIGNED<br>
FIXED(p, s) UNSIGNED ZEROFILL<br>
where 38 < p <= 65<br>
</td>
<td>STRING</td>
<td>在 MySQL 中,十进制数据类型的精度高达 65但在 Flink 中,十进制数据类型的精度仅限于 38。所以如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。</td>
</tr>
<tr>
<td>
BOOLEAN<br>
TINYINT(1)<br>
BIT(1)
</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIME [(p)]</td>
<td>TIME [(p)]</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP [(p)]
</td>
<td>TIMESTAMP_LTZ [(p)]
</td>
<td></td>
</tr>
<tr>
<td>DATETIME [(p)]
</td>
<td>TIMESTAMP [(p)]
</td>
<td></td>
</tr>
<tr>
<td>
CHAR(n)
</td>
<td>CHAR(n)</td>
<td></td>
</tr>
<tr>
<td>
VARCHAR(n)
</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
<tr>
<td>
BIT(n)
</td>
<td>BINARY(⌈(n + 7) / 8⌉)</td>
<td></td>
</tr>
<tr>
<td>
BINARY(n)
</td>
<td>BINARY(n)</td>
<td></td>
</tr>
<tr>
<td>
VARBINARY(N)
</td>
<td>VARBINARY(N)</td>
<td></td>
</tr>
<tr>
<td>
TINYTEXT<br>
TEXT<br>
MEDIUMTEXT<br>
LONGTEXT<br>
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
TINYBLOB<br>
BLOB<br>
MEDIUMBLOB<br>
LONGBLOB<br>
</td>
<td>BYTES</td>
<td>目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 21474836472**31-1的 blob。 </td>
</tr>
<tr>
<td>
ENUM
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
JSON
</td>
<td>STRING</td>
<td> JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。</td>
</tr>
<tr>
<td>
SET
</td>
<td>-</td>
<td> 暂不支持 </td>
</tr>
<tr>
<td>
GEOMETRY<br>
POINT<br>
LINESTRING<br>
POLYGON<br>
MULTIPOINT<br>
MULTILINESTRING<br>
MULTIPOLYGON<br>
GEOMETRYCOLLECTION<br>
</td>
<td>
STRING
</td>
<td>
MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。
请参考 MySQL <a href="#a-name-id-003-a">空间数据类型映射</a> 章节了解更多详细信息。
</td>
</tr>
</tbody>
</table>
</div>
### 空间数据类型映射<a name="空间数据类型映射" id="003"></a>
MySQL中除`GEOMETRYCOLLECTION`之外的空间数据类型都会转换为 Json 字符串,格式固定,如:<br>
```json
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
```
字段`srid`标识定义几何体的 SRS如果未指定 SRID则 SRID 0 是新几何体值的默认值。
由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID因此在版本较低的MySQL中字段`srid`将始终为 0。
字段`type`标识空间数据类型,例如`POINT`/`LINESTRING`/`POLYGON`。
字段`coordinates`表示空间数据的`坐标`。
对于`GEOMETRYCOLLECTION`,它将转换为 Json 字符串,格式固定,如:<br>
```json
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
```
`Geometrics`字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">Spatial data in MySQL</th>
<th class="text-left">Json String converted in Flink</th>
</tr>
</thead>
<tbody>
<tr>
<td>POINT(1 1)</td>
<td>{"coordinates":[1,1],"type":"Point","srid":0}</td>
</tr>
<tr>
<td>LINESTRING(3 0, 3 3, 3 5)</td>
<td>{"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}</td>
</tr>
<tr>
<td>POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))</td>
<td>{"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}</td>
</tr>
<tr>
<td>MULTIPOINT((1 1),(2 2))</td>
<td>{"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}</td>
</tr>
<tr>
<td>MultiLineString((1 1,2 2,3 3),(4 4,5 5))</td>
<td>{"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}</td>
</tr>
<tr>
<td>MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))</td>
<td>{"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}</td>
</tr>
<tr>
<td>GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))</td>
<td>{"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}</td>
</tr>
</tbody>
</table>
</div>
常见问题
--------
* [FAQ(English)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ)
* [FAQ(中文)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH))

@ -0,0 +1,547 @@
# MySQL CDC Pipeline Connector
The MySQL CDC Pipeline Connector allows for reading snapshot data and incremental data from MySQL database and provides end-to-end full-database data synchronization capabilities.
This document describes how to setup the MySQL CDC Pipeline connector.
How to create Pipeline
----------------
The pipeline for reading data from MySQL and sink to Doris can be defined as follows:
```yaml
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```
Pipeline Connector Options
----------------
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 10%">Option</th>
<th class="text-left" style="width: 8%">Required</th>
<th class="text-left" style="width: 7%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>hostname</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>IP address or hostname of the MySQL database server.</td>
</tr>
<tr>
<td>port</td>
<td>optional</td>
<td style="word-wrap: break-word;">3306</td>
<td>Integer</td>
<td>Integer port number of the MySQL database server.</td>
</tr>
<tr>
<td>username</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the MySQL database to use when connecting to the MySQL database server.</td>
</tr>
<tr>
<td>password</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password to use when connecting to the MySQL database server.</td>
</tr>
<tr>
<td>tables</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br>
It is important to note that the dot (.) is treated as a delimiter for database and table names.
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br>
eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
</tr>
<tr>
<td>schema-change.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to send schema change events, so that downstream sinks can respond to schema changes and achieve table structure synchronization.</td>
</tr>
<tr>
<td>server-id</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400',
the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled.
Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster
as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400,
though we recommend setting an explicit value. </td>
</tr>
<tr>
<td>scan.incremental.snapshot.chunk.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">8096</td>
<td>Integer</td>
<td>The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.</td>
</tr>
<tr>
<td>scan.snapshot.fetch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The maximum fetch size for per poll when read table snapshot.</td>
</tr>
<tr>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td>Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset" and "timestamp".
Please see <a href="#startup-reading-position">Startup Reading Position</a> section for more detailed information.</td>
</tr>
<tr>
<td>scan.startup.specific-offset.file</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional binlog file name used in case of "specific-offset" startup mode</td>
</tr>
<tr>
<td>scan.startup.specific-offset.pos</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Optional binlog file position used in case of "specific-offset" startup mode</td>
</tr>
<tr>
<td>scan.startup.specific-offset.gtid-set</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional GTID set used in case of "specific-offset" startup mode</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-events</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Optional number of events to skip after the specific starting offset</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-rows</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Optional number of rows to skip after the specific starting offset</td>
</tr>
<tr>
<td>connect.timeout</td>
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.</td>
</tr>
<tr>
<td>connect.max-retries</td>
<td>optional</td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The max retry times that the connector should retry to build MySQL database server connection.</td>
</tr>
<tr>
<td>connection.pool.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>Integer</td>
<td>The connection pool size.</td>
</tr>
<tr>
<td>jdbc.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>String</td>
<td>Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.</td>
</tr>
<tr>
<td>heartbeat.interval</td>
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The interval of sending heartbeat event for tracing the latest available binlog offsets.</td>
</tr>
<tr>
<td>debezium.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
For example: <code>'debezium.snapshot.mode' = 'never'</code>.
See more about the <a href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-connector-properties">Debezium's MySQL Connector properties</a></td>
</tr>
<tr>
<td>scan.incremental.close-idle-reader.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. <br>
The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br>
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
</tbody>
</table>
</div>
Startup Reading Position
--------
The config option `scan.startup.mode` specifies the startup mode for MySQL CDC consumer. The valid enumerations are:
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
- `earliest-offset`: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from
the end of the binlog which means only have the changes since the connector was started.
- `specific-offset`: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be
specified with binlog filename and position, or a GTID set if GTID is enabled on server.
- `timestamp`: Skip snapshot phase and start reading binlog events from a specific timestamp.
Data Type Mapping
----------------
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width:30%;">MySQL type<a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html"></a></th>
<th class="text-left" style="width:10%;">CDC type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT(n)</td>
<td>TINYINT</td>
<td></td>
</tr>
<tr>
<td>
SMALLINT<br>
TINYINT UNSIGNED<br>
TINYINT UNSIGNED ZEROFILL
</td>
<td>SMALLINT</td>
<td></td>
</tr>
<tr>
<td>
INT<br>
YEAR<br>
MEDIUMINT<br>
MEDIUMINT UNSIGNED<br>
MEDIUMINT UNSIGNED ZEROFILL<br>
SMALLINT UNSIGNED<br>
SMALLINT UNSIGNED ZEROFILL
</td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>
BIGINT<br>
INT UNSIGNED<br>
INT UNSIGNED ZEROFILL
</td>
<td>BIGINT</td>
<td></td>
</tr>
<tr>
<td>
BIGINT UNSIGNED<br>
BIGINT UNSIGNED ZEROFILL<br>
SERIAL
</td>
<td>DECIMAL(20, 0)</td>
<td></td>
</tr>
<tr>
<td>
FLOAT<br>
FLOAT UNSIGNED<br>
FLOAT UNSIGNED ZEROFILL
</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>
REAL<br>
REAL UNSIGNED<br>
REAL UNSIGNED ZEROFILL<br>
DOUBLE<br>
DOUBLE UNSIGNED<br>
DOUBLE UNSIGNED ZEROFILL<br>
DOUBLE PRECISION<br>
DOUBLE PRECISION UNSIGNED<br>
DOUBLE PRECISION UNSIGNED ZEROFILL
</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
NUMERIC(p, s) UNSIGNED<br>
NUMERIC(p, s) UNSIGNED ZEROFILL<br>
DECIMAL(p, s)<br>
DECIMAL(p, s) UNSIGNED<br>
DECIMAL(p, s) UNSIGNED ZEROFILL<br>
FIXED(p, s)<br>
FIXED(p, s) UNSIGNED<br>
FIXED(p, s) UNSIGNED ZEROFILL<br>
where p <= 38<br>
</td>
<td>DECIMAL(p, s)</td>
<td></td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
NUMERIC(p, s) UNSIGNED<br>
NUMERIC(p, s) UNSIGNED ZEROFILL<br>
DECIMAL(p, s)<br>
DECIMAL(p, s) UNSIGNED<br>
DECIMAL(p, s) UNSIGNED ZEROFILL<br>
FIXED(p, s)<br>
FIXED(p, s) UNSIGNED<br>
FIXED(p, s) UNSIGNED ZEROFILL<br>
where 38 < p <= 65<br>
</td>
<td>STRING</td>
<td>The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink.
So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.</td>
</tr>
<tr>
<td>
BOOLEAN<br>
TINYINT(1)<br>
BIT(1)
</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIME [(p)]</td>
<td>TIME [(p)]</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP [(p)]
</td>
<td>TIMESTAMP_LTZ [(p)]
</td>
<td></td>
</tr>
<tr>
<td>DATETIME [(p)]
</td>
<td>TIMESTAMP [(p)]
</td>
<td></td>
</tr>
<tr>
<td>
CHAR(n)
</td>
<td>CHAR(n)</td>
<td></td>
</tr>
<tr>
<td>
VARCHAR(n)
</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
<tr>
<td>
BIT(n)
</td>
<td>BINARY(⌈(n + 7) / 8⌉)</td>
<td></td>
</tr>
<tr>
<td>
BINARY(n)
</td>
<td>BINARY(n)</td>
<td></td>
</tr>
<tr>
<td>
VARBINARY(N)
</td>
<td>VARBINARY(N)</td>
<td></td>
</tr>
<tr>
<td>
TINYTEXT<br>
TEXT<br>
MEDIUMTEXT<br>
LONGTEXT<br>
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
TINYBLOB<br>
BLOB<br>
MEDIUMBLOB<br>
LONGBLOB<br>
</td>
<td>BYTES</td>
<td>Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. </td>
</tr>
<tr>
<td>
ENUM
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
JSON
</td>
<td>STRING</td>
<td>The JSON data type will be converted into STRING with JSON format in Flink.</td>
</tr>
<tr>
<td>
SET
</td>
<td>-</td>
<td>Not supported yet.</td>
</tr>
<tr>
<td>
GEOMETRY<br>
POINT<br>
LINESTRING<br>
POLYGON<br>
MULTIPOINT<br>
MULTILINESTRING<br>
MULTIPOLYGON<br>
GEOMETRYCOLLECTION<br>
</td>
<td>
STRING
</td>
<td>
The spatial data types in MySQL will be converted into STRING with a fixed Json format.
Please see <a href="#mysql-spatial-data-types-mapping ">MySQL Spatial Data Types Mapping</a> section for more detailed information.
</td>
</tr>
</tbody>
</table>
</div>
### MySQL Spatial Data Types Mapping
The spatial data types except for `GEOMETRYCOLLECTION` in MySQL will be converted into Json String with a fixed format like:<br>
```json
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
```
The field `srid` identifies the SRS in which the geometry is defined, SRID 0 is the default for new geometry values if no SRID is specified.
As only MySQL 8+ support to specific SRID when define spatial data type, the field `srid` will always be 0 in MySQL with a lower version.
The field `type` identifies the spatial data type, such as `POINT`/`LINESTRING`/`POLYGON`.
The field `coordinates` represents the `coordinates` of the spatial data.
For `GEOMETRYCOLLECTION`, it will be converted into Json String with a fixed format like:<br>
```json
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
```
The field `geometries` is an array contains all spatial data.
The example for different spatial data types mapping is as follows:
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">Spatial data in MySQL</th>
<th class="text-left">Json String converted in Flink</th>
</tr>
</thead>
<tbody>
<tr>
<td>POINT(1 1)</td>
<td>{"coordinates":[1,1],"type":"Point","srid":0}</td>
</tr>
<tr>
<td>LINESTRING(3 0, 3 3, 3 5)</td>
<td>{"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}</td>
</tr>
<tr>
<td>POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))</td>
<td>{"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}</td>
</tr>
<tr>
<td>MULTIPOINT((1 1),(2 2))</td>
<td>{"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}</td>
</tr>
<tr>
<td>MultiLineString((1 1,2 2,3 3),(4 4,5 5))</td>
<td>{"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}</td>
</tr>
<tr>
<td>MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))</td>
<td>{"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}</td>
</tr>
<tr>
<td>GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))</td>
<td>{"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}</td>
</tr>
</tbody>
</table>
</div>
FAQ
--------
* [FAQ(English)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ)
* [FAQ(中文)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH))

@ -7,6 +7,7 @@ content/overview/index
content/quickstart/index
content/快速上手/index
content/connectors/index
content/pipelines/index
content/formats/index
content/downloads
content/githublink

Loading…
Cancel
Save