|
|
|
@ -136,87 +136,92 @@ VALUES (default,10001,'Beijing','Shanghai',false),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
4. Download following JAR package to `<FLINK_HOME>/lib/`:
|
|
|
|
|
- [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.11.1/flink-sql-connector-elasticsearch7_2.11-1.11.1.jar)
|
|
|
|
|
- [flink-sql-connector-mysql-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.0.0/flink-sql-connector-mysql-cdc-1.0.0.jar)
|
|
|
|
|
- [flink-sql-connector-postgres-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.0.0/flink-sql-connector-postgres-cdc-1.0.0.jar)
|
|
|
|
|
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
|
|
|
|
|
- [flink-sql-connector-mysql-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.0.0/flink-sql-connector-mysql-cdc-2.0.0.jar)
|
|
|
|
|
- [flink-sql-connector-postgres-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.0.0/flink-sql-connector-postgres-cdc-2.0.0.jar)
|
|
|
|
|
|
|
|
|
|
5. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
-- Flink SQL
|
|
|
|
|
CREATE TABLE products (
|
|
|
|
|
id INT,
|
|
|
|
|
name STRING,
|
|
|
|
|
description STRING
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '123456',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'products'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(0),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '123456',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'orders'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE shipments (
|
|
|
|
|
shipment_id INT,
|
|
|
|
|
order_id INT,
|
|
|
|
|
origin STRING,
|
|
|
|
|
destination STRING,
|
|
|
|
|
is_arrived BOOLEAN
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'postgres-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '5432',
|
|
|
|
|
'username' = 'postgres',
|
|
|
|
|
'password' = 'postgres',
|
|
|
|
|
'database-name' = 'postgres',
|
|
|
|
|
'schema-name' = 'public',
|
|
|
|
|
'table-name' = 'shipments'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
CREATE TABLE enriched_orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(0),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN,
|
|
|
|
|
product_name STRING,
|
|
|
|
|
product_description STRING,
|
|
|
|
|
shipment_id INT,
|
|
|
|
|
origin STRING,
|
|
|
|
|
destination STRING,
|
|
|
|
|
is_arrived BOOLEAN,
|
|
|
|
|
PRIMARY KEY (order_id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'elasticsearch-7',
|
|
|
|
|
'hosts' = 'http://localhost:9200',
|
|
|
|
|
'index' = 'enriched_orders'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
INSERT INTO enriched_orders
|
|
|
|
|
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
|
|
|
|
|
FROM orders AS o
|
|
|
|
|
LEFT JOIN products AS p ON o.product_id = p.id
|
|
|
|
|
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
|
|
|
|
|
-- checkpoint every 3000 milliseconds
|
|
|
|
|
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
|
|
|
|
|
Flink SQL> CREATE TABLE products (
|
|
|
|
|
id INT,
|
|
|
|
|
name STRING,
|
|
|
|
|
description STRING,
|
|
|
|
|
PRIMARY KEY (id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '123456',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'products'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> CREATE TABLE orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(0),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN,
|
|
|
|
|
PRIMARY KEY (order_id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'mysql-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '3306',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '123456',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'orders'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> CREATE TABLE shipments (
|
|
|
|
|
shipment_id INT,
|
|
|
|
|
order_id INT,
|
|
|
|
|
origin STRING,
|
|
|
|
|
destination STRING,
|
|
|
|
|
is_arrived BOOLEAN,
|
|
|
|
|
PRIMARY KEY (shipment_id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'postgres-cdc',
|
|
|
|
|
'hostname' = 'localhost',
|
|
|
|
|
'port' = '5432',
|
|
|
|
|
'username' = 'postgres',
|
|
|
|
|
'password' = 'postgres',
|
|
|
|
|
'database-name' = 'postgres',
|
|
|
|
|
'schema-name' = 'public',
|
|
|
|
|
'table-name' = 'shipments'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> CREATE TABLE enriched_orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(0),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
order_status BOOLEAN,
|
|
|
|
|
product_name STRING,
|
|
|
|
|
product_description STRING,
|
|
|
|
|
shipment_id INT,
|
|
|
|
|
origin STRING,
|
|
|
|
|
destination STRING,
|
|
|
|
|
is_arrived BOOLEAN,
|
|
|
|
|
PRIMARY KEY (order_id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'elasticsearch-7',
|
|
|
|
|
'hosts' = 'http://localhost:9200',
|
|
|
|
|
'index' = 'enriched_orders'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> INSERT INTO enriched_orders
|
|
|
|
|
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
|
|
|
|
|
FROM orders AS o
|
|
|
|
|
LEFT JOIN products AS p ON o.product_id = p.id
|
|
|
|
|
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
6. Make some changes in MySQL and Postgres, then check the result in Elasticsearch:
|
|
|
|
@ -246,25 +251,25 @@ Execute following SQL in Flink SQL CLI:
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
-- Flink SQL
|
|
|
|
|
CREATE TABLE kafka_gmv (
|
|
|
|
|
day_str STRING,
|
|
|
|
|
gmv DECIMAL(10, 5)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'kafka',
|
|
|
|
|
'topic' = 'kafka_gmv',
|
|
|
|
|
'scan.startup.mode' = 'earliest-offset',
|
|
|
|
|
'properties.bootstrap.servers' = 'localhost:9092',
|
|
|
|
|
'format' = 'changelog-json'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
INSERT INTO kafka_gmv
|
|
|
|
|
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
|
|
|
|
|
FROM orders
|
|
|
|
|
WHERE order_status = true
|
|
|
|
|
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
|
|
|
|
|
Flink SQL> CREATE TABLE kafka_gmv (
|
|
|
|
|
day_str STRING,
|
|
|
|
|
gmv DECIMAL(10, 5)
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'kafka',
|
|
|
|
|
'topic' = 'kafka_gmv',
|
|
|
|
|
'scan.startup.mode' = 'earliest-offset',
|
|
|
|
|
'properties.bootstrap.servers' = 'localhost:9092',
|
|
|
|
|
'format' = 'changelog-json'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> INSERT INTO kafka_gmv
|
|
|
|
|
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
|
|
|
|
|
FROM orders
|
|
|
|
|
WHERE order_status = true
|
|
|
|
|
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
|
|
|
|
|
|
|
|
|
|
-- Consumer changelog data from Kafka, and check the result of materialized view:
|
|
|
|
|
SELECT * FROM kafka_gmv;
|
|
|
|
|
Flink SQL> SELECT * FROM kafka_gmv;
|
|
|
|
|
```
|
|
|
|
|
To consumer records in Kafka using `kafka-console-consumer`:
|
|
|
|
|
|
|
|
|
|