[docs] Add checkpoint config in document example

pull/328/head
Leonard Xu 4 years ago
parent ee6c345cbb
commit 30474d8c3b

@ -71,15 +71,18 @@ How to create a MySQL CDC table
The MySQL CDC table can be defined as following: The MySQL CDC table can be defined as following:
```sql ```sql
-- checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- register a MySQL table 'orders' in Flink SQL -- register a MySQL table 'orders' in Flink SQL
CREATE TABLE orders ( Flink SQL> CREATE TABLE orders (
order_id INT, order_id INT,
order_date TIMESTAMP(0), order_date TIMESTAMP(0),
customer_name STRING, customer_name STRING,
price DECIMAL(10, 5), price DECIMAL(10, 5),
product_id INT, product_id INT,
order_status BOOLEAN order_status BOOLEAN
) WITH ( WITH (
'connector' = 'mysql-cdc', 'connector' = 'mysql-cdc',
'hostname' = 'localhost', 'hostname' = 'localhost',
'port' = '3306', 'port' = '3306',
@ -89,7 +92,7 @@ CREATE TABLE orders (
'table-name' = 'orders'); 'table-name' = 'orders');
-- read snapshot and binlogs from orders table -- read snapshot and binlogs from orders table
SELECT * FROM orders; Flink SQL> SELECT * FROM orders;
``` ```
Connector Options Connector Options
@ -377,6 +380,7 @@ public class MySqlBinlogSourceExample {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000); // checkpoint every 3000 milliseconds
env env
.addSource(sourceFunction) .addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

Loading…
Cancel
Save