|
|
|
@ -81,8 +81,9 @@ Flink SQL> CREATE TABLE orders (
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN
|
|
|
|
|
WITH (
|
|
|
|
|
order_status BOOLEAN,
|
|
|
|
|
PRIMARY KEY(order_id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
@ -365,11 +366,11 @@ The Incremental Snapshot Reading feature of MySQL CDC Source only exposes in SQL
|
|
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
|
|
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
|
|
|
|
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.MySQLSource;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.MySqlSource;
|
|
|
|
|
|
|
|
|
|
public class MySqlBinlogSourceExample {
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
|
|
|
|
|
SourceFunction<String> sourceFunction = MySqlSource.<String>builder()
|
|
|
|
|
.hostname("localhost")
|
|
|
|
|
.port(3306)
|
|
|
|
|
.databaseList("inventory") // monitor all tables under inventory database
|
|
|
|
|