|
|
|
@ -71,25 +71,28 @@ How to create a MySQL CDC table
|
|
|
|
|
The MySQL CDC table can be defined as following:
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
-- register a MySQL table 'orders' in Flink SQL
|
|
|
|
|
CREATE TABLE orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(0),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '123456',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'orders');
|
|
|
|
|
-- checkpoint every 3000 milliseconds
|
|
|
|
|
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
|
|
|
|
|
|
|
|
|
|
-- register a MySQL table 'orders' in Flink SQL
|
|
|
|
|
Flink SQL> CREATE TABLE orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(0),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN
|
|
|
|
|
WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '123456',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'orders');
|
|
|
|
|
|
|
|
|
|
-- read snapshot and binlogs from orders table
|
|
|
|
|
SELECT * FROM orders;
|
|
|
|
|
Flink SQL> SELECT * FROM orders;
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Connector Options
|
|
|
|
@ -376,7 +379,8 @@ public class MySqlBinlogSourceExample {
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
env.enableCheckpointing(3000); // checkpoint every 3000 milliseconds
|
|
|
|
|
env
|
|
|
|
|
.addSource(sourceFunction)
|
|
|
|
|
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
|
|
|
|
|