diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index 9c26f8341..6504f1273 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -71,25 +71,28 @@ How to create a MySQL CDC table The MySQL CDC table can be defined as following: ```sql --- register a MySQL table 'orders' in Flink SQL -CREATE TABLE orders ( - order_id INT, - order_date TIMESTAMP(0), - customer_name STRING, - price DECIMAL(10, 5), - product_id INT, - order_status BOOLEAN -) WITH ( - 'connector' = 'mysql-cdc', - 'hostname' = 'localhost', - 'port' = '3306', - 'username' = 'root', - 'password' = '123456', - 'database-name' = 'mydb', - 'table-name' = 'orders'); +-- checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; +-- register a MySQL table 'orders' in Flink SQL +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN + WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'orders'); + -- read snapshot and binlogs from orders table -SELECT * FROM orders; +Flink SQL> SELECT * FROM orders; ``` Connector Options @@ -376,7 +379,8 @@ public class MySqlBinlogSourceExample { .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - + + env.enableCheckpointing(3000); // checkpoint every 3000 milliseconds env .addSource(sourceFunction) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering