[docs] Restructure the "tutorials" to "quickstart"

pull/526/head
Jark Wu 3 years ago
parent c861418f63
commit 239ef3e6c7
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -1,4 +1,4 @@
# About Flink CDC
# What's Flink CDC
Flink CDC Connectors is a set of source connectors for <a href="https://flink.apache.org/">Apache Flink</a>, ingesting changes from different databases using change data capture (CDC).
The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is [Debezium](https://github.com/debezium/debezium).

@ -0,0 +1,9 @@
# Getting Started
```{toctree}
:maxdepth: 2
mysql-postgres-tutorial
mongodb-tutorial
oracle-tutorial
```

@ -1,4 +1,4 @@
# Streaming ETL from MongoDB to Elasticsearch
# Demo: MongoDB CDC to Elasticsearch
1. Create `docker-compose.yml` file using following contents:

@ -1,6 +1,6 @@
# Streaming ETL from Oracle to Elasticsearch
# Demo: Oracle CDC to Elasticsearch
1. Create `docker-compose.yml` file using following contents:
**1. Create `docker-compose.yml` file using following contents:**
```
version: '2.1'
@ -49,13 +49,18 @@ Dont forget to run the following command to stop all containers after you fin
```shell
docker-compose down
```
2. Download following JAR package to `<FLINK_HOME>/lib`:
```Download links are available only for stable releases.```
**2. Download following JAR package to `<FLINK_HOME>/lib`**
*Download links are available only for stable releases.*
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.1-SNAPSHOT/flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar)
3. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
**3. Launch a Flink cluster and start a Flink SQL CLI**
Execute following SQL statements in the Flink SQL CLI:
```sql
-- Flink SQL
-- checkpoint every 3000 milliseconds
@ -117,9 +122,14 @@ Flink SQL> INSERT INTO enriched_orders
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```
4. Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:8081/#/overview) to see the data.
**4. Check result in Elasticsearch**
Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:8081/#/overview) to see the data.
**5. Make changes in Oracle and watch result in Elasticsearch**
Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement:
5. Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement:
```shell
docker-compose exec sqlplus flinkuser/flinkpw
```

@ -1,296 +0,0 @@
# Streaming ETL from MySQL and Postgres to Elasticsearch中文
1. 下载 `docker-compose.yml`
```
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
- "9094:9094"
depends_on:
- zookeeper
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
- KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS="user_behavior:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
```
2. 进入 mysql 容器,初始化数据:
```
docker-compose exec mysql mysql -uroot -p123456
```
```sql
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- 是否下单
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
```
3. 进入postgres 容器,初始化数据:
```
docker-compose exec postgres psql -h localhost -U postgres
```
```sql
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
4. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:
```下载链接只在已发布的版本上可用```
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar)
- [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar)
5. 然后启动 Flink 集群,再启动 SQL CLI.
```sql
--Flink SQL
-- 设置 checkpoint 间隔为 3 秒
Flink SQL> SET execution.checkpointing.interval = 3s;
Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
Flink SQL> CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
```
6. 修改 mysql 和 postgres 里面的数据,观察 elasticsearch 里的结果。
```sql
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
--MySQL
DELETE FROM orders WHERE order_id = 10004;
```
7. Kafka changelog json format
在 SQL CLI 中:
```sql
--Flink SQL
Flink SQL> CREATE TABLE kafka_gmv (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
);
Flink SQL> INSERT INTO kafka_gmv
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
FROM orders
WHERE order_status = true
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
-- 读取 Kafka 的 changelog 数据,观察 materialize 后的结果
Flink SQL> SELECT * FROM kafka_gmv;
```
观察 kafka 的输出:
```
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
```
更新 orders 数据观察SQL CLI 和 kafka console 的输出
```sql
-- MySQL
UPDATE orders SET order_status = true WHERE order_id = 10001;
UPDATE orders SET order_status = true WHERE order_id = 10002;
UPDATE orders SET order_status = true WHERE order_id = 10003;
INSERT INTO orders
VALUES (default, '2020-07-30 17:33:00', 'Timo', 50.00, 104, true);
UPDATE orders SET price = 40.00 WHERE order_id = 10005;
DELETE FROM orders WHERE order_id = 10005;
```

@ -1,12 +1,9 @@
# Tutorials
# 快速上手
```{toctree}
:maxdepth: 2
mysql-postgres-tutorial
mysql-postgres-tutorial-zh
mongodb-tutorial
mongodb-tutorial-zh
oracle-tutorial
oracle-tutorial-zh
```

@ -1,4 +1,4 @@
# Streaming ETL from MongoDB to Elasticsearch中文
# 演示: MongoDB CDC 导入 Elasticsearch
1. 下载 `docker-compose.yml`

@ -0,0 +1,319 @@
# 基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
This tutorial is to show how to quickly build streaming ETL for MySQL and Postgres with Flink CDC.
Assuming we are running an e-commerce business. The product and order data stored in MySQL, the shipment data related to the order is stored in Postgres.
We want to enrich the orders using the product and shipment table, and then load the enriched orders to ElasticSearch in real time.
In the following sections, we will describe how to use Flink Mysql/Postgres CDC to implement it.
All exercises in this tutorial are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.
The overview of the architecture is as follows:
![Flink CDC Streaming ETL](/_static/fig/mysql-postgress-tutorial/flink-cdc-streaming-etl.png "Flink CDC Streaming ETL")
## Preparation
Prepare a Linux or MacOS computer with Docker installed.
### Starting components required
The components required in this demo are all managed in containers, so we will use `docker-compose` to start them.
Create `docker-compose.yml` file using following contents:
```
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
```
The Docker Compose environment consists of the following containers:
- MySQL: the `products`,`orders` tables will be store in the database. They will be joined with data in Postgres to enrich the orders.
- Postgres: the `shipments` table will be store in the database.
- Elasticsearch: mainly used as a data sink to store enriched orders.
- Kibana: used to visualize the data in Elasticsearch.
To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
```shell
docker-compose up -d
```
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly.
We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
Dont forget to run the following command to stop all containers after finishing the tutorial:
```shell
docker-compose down
```
### Preparing Flink and JAR package required
1. Download [Flink 1.13.2](https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz) and unzip it to the directory `flink-1.13.2`
2. Download following JAR package required and put them under `flink-1.13.2/lib/`:
**Download links are available only for stable releases.**
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar)
- [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar)
### Preparing data in databases
#### Preparing data in MySQL
1. Enter mysql's container:
```shell
docker-compose exec mysql mysql -uroot -p123456
```
2. Create tables and populate data:
```sql
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
```
#### Preparing data in Postgres
1. Enter postgres's container:
```shell
docker-compose exec postgres psql -h localhost -U postgres
```
2. Create tables and populate data
```sql
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
## Starting Flink cluster and Flink SQL CLI
1. Use the following command to change to the Flink directory:
```
cd flink-1.13.2
```
2. Change the value of `taskmanager.numberOfTaskSlots` to 2 in `conf/flink-conf.yaml` for we will run two tasks at the same time.
3. Use the following command to start a Flink cluster:
```shell
./bin/start-cluster.sh
```
Then we can visit [http://localhost:8081/](http://localhost:8081/) to see if Flink is running normally, and the web page looks like:
![Flink UI](/_static/fig/mysql-postgress-tutorial/flink-ui.png "Flink UI")
Dont forget to run the following command to stop the Flink cluster after finishing the tutorial:
```shell
./bin/stop-cluster.sh
```
4. Use the following command to start a Flink SQL CLI:
```shell
./bin/sql-client.sh
```
We should see the welcome screen of the CLI client.
![Flink SQL Client](/_static/fig/mysql-postgress-tutorial/flink-sql-client.png "Flink SQL Client")
## Creating tables using Flink DDL in Flink SQL CLI
First, enable checkpoints every 3000 milliseconds
```sql
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
```
Then, create tables that capture the change data from the corresponding database tables.
```sql
-- Flink SQL
Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
Flink SQL> CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);
```
Finally, create `enriched_orders` table that is used to load data to the Elasticsearch.
```sql
-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
```
## Enriching orders and load to ElasticSearch
Use Flink SQL to join the `order` table with the `products` and `shipments` table to enrich orders and write to the Elasticsearch.
```sql
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
```
Now, the enriched orders should be shown in Kibana.
Visit [http://localhost:5601/app/kibana#/management/kibana/index_pattern](http://localhost:5601/app/kibana#/management/kibana/index_pattern) to create an index pattern `enriched_orders`.
![Create Index Pattern](/_static/fig/mysql-postgress-tutorial/kibana-create-index-pattern.png "Create Index Pattern")
Visit [http://localhost:5601/app/kibana#/discover](http://localhost:5601/app/kibana#/discover) to find the enriched orders.
![Find enriched Orders](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders.png "Find enriched Orders")
Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
1. Insert a new order in MySQL
```sql
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
```
2. Insert a shipment in Postgres
```sql
--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
```
3. Update order status in MySQL
```sql
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
```
4. Update the shipment status in Postgres
```sql
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
```
5. Delete the shipment in Postgres
```sql
--MySQL
DELETE FROM orders WHERE order_id = 10004;
```
The changes of enriched orders in Kibana are as follows:
![Enriched Orders Changes](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders-changes.gif "Enriched Orders Changes")

@ -1,6 +1,7 @@
# Streaming ETL from Oracle to Elasticsearch中文
# 演示: Oracle CDC 导入 Elasticsearch
**1. 创建`docker-compose.yml`文件,内容如下所示:**
1. 创建`docker-compose.yml`文件,内容如下所示:
```
version: '2.1'
services:
@ -44,17 +45,20 @@ docker-compose up -d
该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。
你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。
另外可以通过如下命令停止所有的容器:
```shell
docker-compose down
````
2. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:
```下载链接只在已发布的版本上可用```
**2. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:**
*下载链接只在已发布的版本上可用*
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.1-SNAPSHOT/flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar)
3. 然后启动 Flink 集群,再启动 SQL CLI:
**3. 然后启动 Flink 集群,再启动 SQL CLI:**
```sql
-- Flink SQL
-- checkpoint every 3000 milliseconds
@ -115,9 +119,15 @@ Flink SQL> INSERT INTO enriched_orders
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```
4. 检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:8081/#/overview)看到ElasticSearch中的数据
5. 进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句Elasticsearch中的数据都会实时更新
**4. 检查 ElasticSearch 中的结果**
检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:8081/#/overview)看到ElasticSearch中的数据
**5. 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果**
进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句Elasticsearch中的数据都会实时更新。
```shell
docker-compose exec sqlplus flinkuser/flinkpw
```

@ -4,9 +4,10 @@
:maxdepth: 2
:caption: Contents
content/about
content/quickstart/index
content/快速上手/index
content/connectors/index
content/formats/index
content/tutorials/index
content/downloads
```

Loading…
Cancel
Save