diff --git a/docs/content/tutorials/tutorial-zh.md b/docs/content/tutorials/tutorial-zh.md index 02a8fe91c..57cb2925b 100644 --- a/docs/content/tutorials/tutorial-zh.md +++ b/docs/content/tutorials/tutorial-zh.md @@ -136,87 +136,92 @@ VALUES (default,10001,'Beijing','Shanghai',false), 4. 下载以下 jar 包到 `/lib/`: -- [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.11.1/flink-sql-connector-elasticsearch7_2.11-1.11.1.jar) -- [flink-sql-connector-mysql-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.0.0/flink-sql-connector-mysql-cdc-1.0.0.jar) -- [flink-sql-connector-postgres-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.0.0/flink-sql-connector-postgres-cdc-1.0.0.jar) +- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar) +- [flink-sql-connector-mysql-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.0.0/flink-sql-connector-mysql-cdc-2.0.0.jar) +- [flink-sql-connector-postgres-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.0.0/flink-sql-connector-postgres-cdc-2.0.0.jar) 5. 然后启动 Flink 集群,再启动 SQL CLI. ```sql ---FlinkSQL -CREATE TABLE products ( - id INT, - name STRING, - description STRING -) WITH ( - 'connector' = 'mysql-cdc', - 'hostname' = 'localhost', - 'port' = '3306', - 'username' = 'root', - 'password' = '123456', - 'database-name' = 'mydb', - 'table-name' = 'products' -); - -CREATE TABLE orders ( - order_id INT, - order_date TIMESTAMP(0), - customer_name STRING, - price DECIMAL(10, 5), - product_id INT, - order_status BOOLEAN -) WITH ( - 'connector' = 'mysql-cdc', - 'hostname' = 'localhost', - 'port' = '3306', - 'username' = 'root', - 'password' = '123456', - 'database-name' = 'mydb', - 'table-name' = 'orders' -); - -CREATE TABLE shipments ( - shipment_id INT, - order_id INT, - origin STRING, - destination STRING, - is_arrived BOOLEAN -) WITH ( - 'connector' = 'postgres-cdc', - 'hostname' = 'localhost', - 'port' = '5432', - 'username' = 'postgres', - 'password' = 'postgres', - 'database-name' = 'postgres', - 'schema-name' = 'public', - 'table-name' = 'shipments' -); - -CREATE TABLE enriched_orders ( - order_id INT, - order_date TIMESTAMP(0), - customer_name STRING, - price DECIMAL(10, 5), - product_id INT, - order_status BOOLEAN, - product_name STRING, - product_description STRING, - shipment_id INT, - origin STRING, - destination STRING, - is_arrived BOOLEAN, - PRIMARY KEY (order_id) NOT ENFORCED -) WITH ( - 'connector' = 'elasticsearch-7', - 'hosts' = 'http://localhost:9200', - 'index' = 'enriched_orders' -); - -INSERT INTO enriched_orders -SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived -FROM orders AS o -LEFT JOIN products AS p ON o.product_id = p.id -LEFT JOIN shipments AS s ON o.order_id = s.order_id; +--Flink SQL +-- 设置 checkpoint 间隔为 3 秒 +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; +Flink SQL> CREATE TABLE products ( + id INT, + name STRING, + description STRING, + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'products' + ); + +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'orders' + ); + +Flink SQL> CREATE TABLE shipments ( + shipment_id INT, + order_id INT, + origin STRING, + destination STRING, + is_arrived BOOLEAN, + PRIMARY KEY (shipment_id) NOT ENFORCED + ) WITH ( + 'connector' = 'postgres-cdc', + 'hostname' = 'localhost', + 'port' = '5432', + 'username' = 'postgres', + 'password' = 'postgres', + 'database-name' = 'postgres', + 'schema-name' = 'public', + 'table-name' = 'shipments' + ); + +Flink SQL> CREATE TABLE enriched_orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + product_name STRING, + product_description STRING, + shipment_id INT, + origin STRING, + destination STRING, + is_arrived BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'elasticsearch-7', + 'hosts' = 'http://localhost:9200', + 'index' = 'enriched_orders' + ); + +Flink SQL> INSERT INTO enriched_orders + SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived + FROM orders AS o + LEFT JOIN products AS p ON o.product_id = p.id + LEFT JOIN shipments AS s ON o.order_id = s.order_id; ``` 6. 修改 mysql 和 postgres 里面的数据,观察 elasticsearch 里的结果。 @@ -246,25 +251,25 @@ DELETE FROM orders WHERE order_id = 10004; ```sql --Flink SQL -CREATE TABLE kafka_gmv ( - day_str STRING, - gmv DECIMAL(10, 5) -) WITH ( - 'connector' = 'kafka', - 'topic' = 'kafka_gmv', - 'scan.startup.mode' = 'earliest-offset', - 'properties.bootstrap.servers' = 'localhost:9092', - 'format' = 'changelog-json' -); - -INSERT INTO kafka_gmv -SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv -FROM orders -WHERE order_status = true -GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); +Flink SQL> CREATE TABLE kafka_gmv ( + day_str STRING, + gmv DECIMAL(10, 5) + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'kafka_gmv', + 'scan.startup.mode' = 'earliest-offset', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'changelog-json' + ); + +Flink SQL> INSERT INTO kafka_gmv + SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv + FROM orders + WHERE order_status = true + GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); -- 读取 Kafka 的 changelog 数据,观察 materialize 后的结果 -SELECT * FROM kafka_gmv; +Flink SQL> SELECT * FROM kafka_gmv; ``` 观察 kafka 的输出: diff --git a/docs/content/tutorials/tutorial.md b/docs/content/tutorials/tutorial.md index e81f3c978..337f1853f 100644 --- a/docs/content/tutorials/tutorial.md +++ b/docs/content/tutorials/tutorial.md @@ -136,87 +136,92 @@ VALUES (default,10001,'Beijing','Shanghai',false), 4. Download following JAR package to `/lib/`: -- [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.11.1/flink-sql-connector-elasticsearch7_2.11-1.11.1.jar) -- [flink-sql-connector-mysql-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.0.0/flink-sql-connector-mysql-cdc-1.0.0.jar) -- [flink-sql-connector-postgres-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.0.0/flink-sql-connector-postgres-cdc-1.0.0.jar) +- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar) +- [flink-sql-connector-mysql-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.0.0/flink-sql-connector-mysql-cdc-2.0.0.jar) +- [flink-sql-connector-postgres-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.0.0/flink-sql-connector-postgres-cdc-2.0.0.jar) 5. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside: ```sql -- Flink SQL -CREATE TABLE products ( - id INT, - name STRING, - description STRING -) WITH ( - 'connector' = 'mysql-cdc', - 'hostname' = 'localhost', - 'port' = '3306', - 'username' = 'root', - 'password' = '123456', - 'database-name' = 'mydb', - 'table-name' = 'products' -); - -CREATE TABLE orders ( - order_id INT, - order_date TIMESTAMP(0), - customer_name STRING, - price DECIMAL(10, 5), - product_id INT, - order_status BOOLEAN -) WITH ( - 'connector' = 'mysql-cdc', - 'hostname' = 'localhost', - 'port' = '3306', - 'username' = 'root', - 'password' = '123456', - 'database-name' = 'mydb', - 'table-name' = 'orders' -); - -CREATE TABLE shipments ( - shipment_id INT, - order_id INT, - origin STRING, - destination STRING, - is_arrived BOOLEAN -) WITH ( - 'connector' = 'postgres-cdc', - 'hostname' = 'localhost', - 'port' = '5432', - 'username' = 'postgres', - 'password' = 'postgres', - 'database-name' = 'postgres', - 'schema-name' = 'public', - 'table-name' = 'shipments' -); - -CREATE TABLE enriched_orders ( - order_id INT, - order_date TIMESTAMP(0), - customer_name STRING, - price DECIMAL(10, 5), - product_id INT, - order_status BOOLEAN, - product_name STRING, - product_description STRING, - shipment_id INT, - origin STRING, - destination STRING, - is_arrived BOOLEAN, - PRIMARY KEY (order_id) NOT ENFORCED -) WITH ( - 'connector' = 'elasticsearch-7', - 'hosts' = 'http://localhost:9200', - 'index' = 'enriched_orders' -); - -INSERT INTO enriched_orders -SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived -FROM orders AS o -LEFT JOIN products AS p ON o.product_id = p.id -LEFT JOIN shipments AS s ON o.order_id = s.order_id; +-- checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; +Flink SQL> CREATE TABLE products ( + id INT, + name STRING, + description STRING, + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'products' + ); + +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'orders' + ); + +Flink SQL> CREATE TABLE shipments ( + shipment_id INT, + order_id INT, + origin STRING, + destination STRING, + is_arrived BOOLEAN, + PRIMARY KEY (shipment_id) NOT ENFORCED + ) WITH ( + 'connector' = 'postgres-cdc', + 'hostname' = 'localhost', + 'port' = '5432', + 'username' = 'postgres', + 'password' = 'postgres', + 'database-name' = 'postgres', + 'schema-name' = 'public', + 'table-name' = 'shipments' + ); + +Flink SQL> CREATE TABLE enriched_orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + product_name STRING, + product_description STRING, + shipment_id INT, + origin STRING, + destination STRING, + is_arrived BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'elasticsearch-7', + 'hosts' = 'http://localhost:9200', + 'index' = 'enriched_orders' + ); + +Flink SQL> INSERT INTO enriched_orders + SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived + FROM orders AS o + LEFT JOIN products AS p ON o.product_id = p.id + LEFT JOIN shipments AS s ON o.order_id = s.order_id; ``` 6. Make some changes in MySQL and Postgres, then check the result in Elasticsearch: @@ -246,25 +251,25 @@ Execute following SQL in Flink SQL CLI: ```sql -- Flink SQL -CREATE TABLE kafka_gmv ( - day_str STRING, - gmv DECIMAL(10, 5) -) WITH ( - 'connector' = 'kafka', - 'topic' = 'kafka_gmv', - 'scan.startup.mode' = 'earliest-offset', - 'properties.bootstrap.servers' = 'localhost:9092', - 'format' = 'changelog-json' -); - -INSERT INTO kafka_gmv -SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv -FROM orders -WHERE order_status = true -GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); +Flink SQL> CREATE TABLE kafka_gmv ( + day_str STRING, + gmv DECIMAL(10, 5) + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'kafka_gmv', + 'scan.startup.mode' = 'earliest-offset', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'changelog-json' + ); + +Flink SQL> INSERT INTO kafka_gmv + SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv + FROM orders + WHERE order_status = true + GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd'); -- Consumer changelog data from Kafka, and check the result of materialized view: -SELECT * FROM kafka_gmv; +Flink SQL> SELECT * FROM kafka_gmv; ``` To consumer records in Kafka using `kafka-console-consumer`: