diff --git a/docs/content/about.md b/docs/content/about.md
index ab47a2509..71cb88fc1 100644
--- a/docs/content/about.md
+++ b/docs/content/about.md
@@ -1,4 +1,4 @@
-# About Flink CDC
+# What's Flink CDC
Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC).
The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is [Debezium](https://github.com/debezium/debezium).
diff --git a/docs/content/quickstart/index.md b/docs/content/quickstart/index.md
new file mode 100644
index 000000000..597930fec
--- /dev/null
+++ b/docs/content/quickstart/index.md
@@ -0,0 +1,9 @@
+# Getting Started
+
+```{toctree}
+:maxdepth: 2
+
+mysql-postgres-tutorial
+mongodb-tutorial
+oracle-tutorial
+```
\ No newline at end of file
diff --git a/docs/content/tutorials/mongodb-tutorial.md b/docs/content/quickstart/mongodb-tutorial.md
similarity index 99%
rename from docs/content/tutorials/mongodb-tutorial.md
rename to docs/content/quickstart/mongodb-tutorial.md
index 14aed48db..28cb63bd5 100644
--- a/docs/content/tutorials/mongodb-tutorial.md
+++ b/docs/content/quickstart/mongodb-tutorial.md
@@ -1,4 +1,4 @@
-# Streaming ETL from MongoDB to Elasticsearch
+# Demo: MongoDB CDC to Elasticsearch
1. Create `docker-compose.yml` file using following contents:
diff --git a/docs/content/tutorials/mysql-postgres-tutorial.md b/docs/content/quickstart/mysql-postgres-tutorial.md
similarity index 100%
rename from docs/content/tutorials/mysql-postgres-tutorial.md
rename to docs/content/quickstart/mysql-postgres-tutorial.md
diff --git a/docs/content/tutorials/oracle-tutorial.md b/docs/content/quickstart/oracle-tutorial.md
similarity index 83%
rename from docs/content/tutorials/oracle-tutorial.md
rename to docs/content/quickstart/oracle-tutorial.md
index de531b7d2..fe195bfa2 100644
--- a/docs/content/tutorials/oracle-tutorial.md
+++ b/docs/content/quickstart/oracle-tutorial.md
@@ -1,6 +1,6 @@
-# Streaming ETL from Oracle to Elasticsearch
+# Demo: Oracle CDC to Elasticsearch
-1. Create `docker-compose.yml` file using following contents:
+**1. Create `docker-compose.yml` file using following contents:**
```
version: '2.1'
@@ -49,13 +49,18 @@ Don’t forget to run the following command to stop all containers after you fin
```shell
docker-compose down
```
-2. Download following JAR package to `/lib`:
- ```Download links are available only for stable releases.```
+
+**2. Download following JAR package to `/lib`**
+
+*Download links are available only for stable releases.*
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.1-SNAPSHOT/flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar)
-3. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
+**3. Launch a Flink cluster and start a Flink SQL CLI**
+
+Execute following SQL statements in the Flink SQL CLI:
+
```sql
-- Flink SQL
-- checkpoint every 3000 milliseconds
@@ -117,9 +122,14 @@ Flink SQL> INSERT INTO enriched_orders
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```
-4. Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:8081/#/overview) to see the data.
+**4. Check result in Elasticsearch**
+
+Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:8081/#/overview) to see the data.
+
+**5. Make changes in Oracle and watch result in Elasticsearch**
+
+Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement:
-5. Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement:
```shell
docker-compose exec sqlplus flinkuser/flinkpw
```
diff --git a/docs/content/tutorials/mysql-postgres-tutorial-zh.md b/docs/content/tutorials/mysql-postgres-tutorial-zh.md
deleted file mode 100644
index d78e55687..000000000
--- a/docs/content/tutorials/mysql-postgres-tutorial-zh.md
+++ /dev/null
@@ -1,296 +0,0 @@
-# Streaming ETL from MySQL and Postgres to Elasticsearch(中文)
-
-1. 下载 `docker-compose.yml`
-
-```
-version: '2.1'
-services:
- postgres:
- image: debezium/example-postgres:1.1
- ports:
- - "5432:5432"
- environment:
- - POSTGRES_PASSWORD=1234
- - POSTGRES_DB=postgres
- - POSTGRES_USER=postgres
- - POSTGRES_PASSWORD=postgres
- mysql:
- image: debezium/example-mysql:1.1
- ports:
- - "3306:3306"
- environment:
- - MYSQL_ROOT_PASSWORD=123456
- - MYSQL_USER=mysqluser
- - MYSQL_PASSWORD=mysqlpw
- elasticsearch:
- image: elastic/elasticsearch:7.6.0
- environment:
- - cluster.name=docker-cluster
- - bootstrap.memory_lock=true
- - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- - discovery.type=single-node
- ports:
- - "9200:9200"
- - "9300:9300"
- ulimits:
- memlock:
- soft: -1
- hard: -1
- nofile:
- soft: 65536
- hard: 65536
- kibana:
- image: elastic/kibana:7.6.0
- ports:
- - "5601:5601"
- zookeeper:
- image: wurstmeister/zookeeper:3.4.6
- ports:
- - "2181:2181"
- kafka:
- image: wurstmeister/kafka:2.12-2.2.1
- ports:
- - "9092:9092"
- - "9094:9094"
- depends_on:
- - zookeeper
- environment:
- - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
- - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
- - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- - KAFKA_CREATE_TOPICS="user_behavior:1:1"
- volumes:
- - /var/run/docker.sock:/var/run/docker.sock
-```
-
-2. 进入 mysql 容器,初始化数据:
-
-```
-docker-compose exec mysql mysql -uroot -p123456
-```
-
-```sql
--- MySQL
-CREATE DATABASE mydb;
-USE mydb;
-CREATE TABLE products (
- id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) NOT NULL,
- description VARCHAR(512)
-);
-ALTER TABLE products AUTO_INCREMENT = 101;
-
-INSERT INTO products
-VALUES (default,"scooter","Small 2-wheel scooter"),
- (default,"car battery","12V car battery"),
- (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
- (default,"hammer","12oz carpenter's hammer"),
- (default,"hammer","14oz carpenter's hammer"),
- (default,"hammer","16oz carpenter's hammer"),
- (default,"rocks","box of assorted rocks"),
- (default,"jacket","water resistent black wind breaker"),
- (default,"spare tire","24 inch spare tire");
-
-CREATE TABLE orders (
- order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- order_date DATETIME NOT NULL,
- customer_name VARCHAR(255) NOT NULL,
- price DECIMAL(10, 5) NOT NULL,
- product_id INTEGER NOT NULL,
- order_status BOOLEAN NOT NULL -- 是否下单
-) AUTO_INCREMENT = 10001;
-
-INSERT INTO orders
-VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
- (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
- (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
-
-```
-
-3. 进入postgres 容器,初始化数据:
-
-```
-docker-compose exec postgres psql -h localhost -U postgres
-```
-
-```sql
--- PG
-CREATE TABLE shipments (
- shipment_id SERIAL NOT NULL PRIMARY KEY,
- order_id SERIAL NOT NULL,
- origin VARCHAR(255) NOT NULL,
- destination VARCHAR(255) NOT NULL,
- is_arrived BOOLEAN NOT NULL
-);
-ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
-ALTER TABLE public.shipments REPLICA IDENTITY FULL;
-
-INSERT INTO shipments
-VALUES (default,10001,'Beijing','Shanghai',false),
- (default,10002,'Hangzhou','Shanghai',false),
- (default,10003,'Shanghai','Hangzhou',false);
-```
-
-
-
-4. 下载以下 jar 包到 `/lib/`:
-
-```下载链接只在已发布的版本上可用```
-
-- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
-- [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar)
-- [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar)
-
-5. 然后启动 Flink 集群,再启动 SQL CLI.
-
-```sql
---Flink SQL
--- 设置 checkpoint 间隔为 3 秒
-Flink SQL> SET execution.checkpointing.interval = 3s;
-Flink SQL> CREATE TABLE products (
- id INT,
- name STRING,
- description STRING,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'localhost',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = '123456',
- 'database-name' = 'mydb',
- 'table-name' = 'products'
- );
-
-Flink SQL> CREATE TABLE orders (
- order_id INT,
- order_date TIMESTAMP(0),
- customer_name STRING,
- price DECIMAL(10, 5),
- product_id INT,
- order_status BOOLEAN,
- PRIMARY KEY (order_id) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'localhost',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = '123456',
- 'database-name' = 'mydb',
- 'table-name' = 'orders'
- );
-
-Flink SQL> CREATE TABLE shipments (
- shipment_id INT,
- order_id INT,
- origin STRING,
- destination STRING,
- is_arrived BOOLEAN,
- PRIMARY KEY (shipment_id) NOT ENFORCED
- ) WITH (
- 'connector' = 'postgres-cdc',
- 'hostname' = 'localhost',
- 'port' = '5432',
- 'username' = 'postgres',
- 'password' = 'postgres',
- 'database-name' = 'postgres',
- 'schema-name' = 'public',
- 'table-name' = 'shipments'
- );
-
-Flink SQL> CREATE TABLE enriched_orders (
- order_id INT,
- order_date TIMESTAMP(0),
- customer_name STRING,
- price DECIMAL(10, 5),
- product_id INT,
- order_status BOOLEAN,
- product_name STRING,
- product_description STRING,
- shipment_id INT,
- origin STRING,
- destination STRING,
- is_arrived BOOLEAN,
- PRIMARY KEY (order_id) NOT ENFORCED
- ) WITH (
- 'connector' = 'elasticsearch-7',
- 'hosts' = 'http://localhost:9200',
- 'index' = 'enriched_orders'
- );
-
-Flink SQL> INSERT INTO enriched_orders
- SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
- FROM orders AS o
- LEFT JOIN products AS p ON o.product_id = p.id
- LEFT JOIN shipments AS s ON o.order_id = s.order_id;
-```
-
-6. 修改 mysql 和 postgres 里面的数据,观察 elasticsearch 里的结果。
-
-```sql
---MySQL
-INSERT INTO orders
-VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
-
---PG
-INSERT INTO shipments
-VALUES (default,10004,'Shanghai','Beijing',false);
-
---MySQL
-UPDATE orders SET order_status = true WHERE order_id = 10004;
-
---PG
-UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
-
---MySQL
-DELETE FROM orders WHERE order_id = 10004;
-```
-
-7. Kafka changelog json format
-
-在 SQL CLI 中:
-
-```sql
---Flink SQL
-Flink SQL> CREATE TABLE kafka_gmv (
- day_str STRING,
- gmv DECIMAL(10, 5)
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'kafka_gmv',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'format' = 'changelog-json'
- );
-
-Flink SQL> INSERT INTO kafka_gmv
- SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
- FROM orders
- WHERE order_status = true
- GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
-
--- 读取 Kafka 的 changelog 数据,观察 materialize 后的结果
-Flink SQL> SELECT * FROM kafka_gmv;
-```
-观察 kafka 的输出:
-
-```
-docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
-```
-
-更新 orders 数据,观察SQL CLI 和 kafka console 的输出
-```sql
--- MySQL
-UPDATE orders SET order_status = true WHERE order_id = 10001;
-UPDATE orders SET order_status = true WHERE order_id = 10002;
-UPDATE orders SET order_status = true WHERE order_id = 10003;
-
-INSERT INTO orders
-VALUES (default, '2020-07-30 17:33:00', 'Timo', 50.00, 104, true);
-
-UPDATE orders SET price = 40.00 WHERE order_id = 10005;
-
-DELETE FROM orders WHERE order_id = 10005;
-```
\ No newline at end of file
diff --git a/docs/content/tutorials/index.md b/docs/content/快速上手/index.md
similarity index 56%
rename from docs/content/tutorials/index.md
rename to docs/content/快速上手/index.md
index 3d3d40ad9..dc115714a 100644
--- a/docs/content/tutorials/index.md
+++ b/docs/content/快速上手/index.md
@@ -1,12 +1,9 @@
-# Tutorials
+# 快速上手
```{toctree}
:maxdepth: 2
-mysql-postgres-tutorial
mysql-postgres-tutorial-zh
-mongodb-tutorial
mongodb-tutorial-zh
-oracle-tutorial
oracle-tutorial-zh
```
\ No newline at end of file
diff --git a/docs/content/tutorials/mongodb-tutorial-zh.md b/docs/content/快速上手/mongodb-tutorial-zh.md
similarity index 98%
rename from docs/content/tutorials/mongodb-tutorial-zh.md
rename to docs/content/快速上手/mongodb-tutorial-zh.md
index bbdfd6b46..0100c8ca8 100644
--- a/docs/content/tutorials/mongodb-tutorial-zh.md
+++ b/docs/content/快速上手/mongodb-tutorial-zh.md
@@ -1,4 +1,4 @@
-# Streaming ETL from MongoDB to Elasticsearch(中文)
+# 演示: MongoDB CDC 导入 Elasticsearch
1. 下载 `docker-compose.yml`
diff --git a/docs/content/快速上手/mysql-postgres-tutorial-zh.md b/docs/content/快速上手/mysql-postgres-tutorial-zh.md
new file mode 100644
index 000000000..87afc185b
--- /dev/null
+++ b/docs/content/快速上手/mysql-postgres-tutorial-zh.md
@@ -0,0 +1,319 @@
+# 基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
+
+This tutorial is to show how to quickly build streaming ETL for MySQL and Postgres with Flink CDC.
+
+Assuming we are running an e-commerce business. The product and order data stored in MySQL, the shipment data related to the order is stored in Postgres.
+We want to enrich the orders using the product and shipment table, and then load the enriched orders to ElasticSearch in real time.
+
+In the following sections, we will describe how to use Flink Mysql/Postgres CDC to implement it.
+All exercises in this tutorial are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.
+
+The overview of the architecture is as follows:
+
+
+## Preparation
+Prepare a Linux or MacOS computer with Docker installed.
+
+### Starting components required
+The components required in this demo are all managed in containers, so we will use `docker-compose` to start them.
+
+Create `docker-compose.yml` file using following contents:
+```
+version: '2.1'
+services:
+ postgres:
+ image: debezium/example-postgres:1.1
+ ports:
+ - "5432:5432"
+ environment:
+ - POSTGRES_PASSWORD=1234
+ - POSTGRES_DB=postgres
+ - POSTGRES_USER=postgres
+ - POSTGRES_PASSWORD=postgres
+ mysql:
+ image: debezium/example-mysql:1.1
+ ports:
+ - "3306:3306"
+ environment:
+ - MYSQL_ROOT_PASSWORD=123456
+ - MYSQL_USER=mysqluser
+ - MYSQL_PASSWORD=mysqlpw
+ elasticsearch:
+ image: elastic/elasticsearch:7.6.0
+ environment:
+ - cluster.name=docker-cluster
+ - bootstrap.memory_lock=true
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ - discovery.type=single-node
+ ports:
+ - "9200:9200"
+ - "9300:9300"
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ nofile:
+ soft: 65536
+ hard: 65536
+ kibana:
+ image: elastic/kibana:7.6.0
+ ports:
+ - "5601:5601"
+```
+The Docker Compose environment consists of the following containers:
+- MySQL: the `products`,`orders` tables will be store in the database. They will be joined with data in Postgres to enrich the orders.
+- Postgres: the `shipments` table will be store in the database.
+- Elasticsearch: mainly used as a data sink to store enriched orders.
+- Kibana: used to visualize the data in Elasticsearch.
+
+To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
+```shell
+docker-compose up -d
+```
+This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly.
+We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
+
+Don’t forget to run the following command to stop all containers after finishing the tutorial:
+```shell
+docker-compose down
+```
+
+### Preparing Flink and JAR package required
+1. Download [Flink 1.13.2](https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz) and unzip it to the directory `flink-1.13.2`
+2. Download following JAR package required and put them under `flink-1.13.2/lib/`:
+
+ **Download links are available only for stable releases.**
+ - [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
+ - [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar)
+ - [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar)
+
+### Preparing data in databases
+#### Preparing data in MySQL
+1. Enter mysql's container:
+ ```shell
+ docker-compose exec mysql mysql -uroot -p123456
+ ```
+2. Create tables and populate data:
+ ```sql
+ -- MySQL
+ CREATE DATABASE mydb;
+ USE mydb;
+ CREATE TABLE products (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL,
+ description VARCHAR(512)
+ );
+ ALTER TABLE products AUTO_INCREMENT = 101;
+
+ INSERT INTO products
+ VALUES (default,"scooter","Small 2-wheel scooter"),
+ (default,"car battery","12V car battery"),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
+ (default,"hammer","12oz carpenter's hammer"),
+ (default,"hammer","14oz carpenter's hammer"),
+ (default,"hammer","16oz carpenter's hammer"),
+ (default,"rocks","box of assorted rocks"),
+ (default,"jacket","water resistent black wind breaker"),
+ (default,"spare tire","24 inch spare tire");
+
+ CREATE TABLE orders (
+ order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ order_date DATETIME NOT NULL,
+ customer_name VARCHAR(255) NOT NULL,
+ price DECIMAL(10, 5) NOT NULL,
+ product_id INTEGER NOT NULL,
+ order_status BOOLEAN NOT NULL -- Whether order has been placed
+ ) AUTO_INCREMENT = 10001;
+
+ INSERT INTO orders
+ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
+ (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
+ (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
+ ```
+#### Preparing data in Postgres
+1. Enter postgres's container:
+ ```shell
+ docker-compose exec postgres psql -h localhost -U postgres
+ ```
+2. Create tables and populate data
+ ```sql
+ -- PG
+ CREATE TABLE shipments (
+ shipment_id SERIAL NOT NULL PRIMARY KEY,
+ order_id SERIAL NOT NULL,
+ origin VARCHAR(255) NOT NULL,
+ destination VARCHAR(255) NOT NULL,
+ is_arrived BOOLEAN NOT NULL
+ );
+ ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
+ ALTER TABLE public.shipments REPLICA IDENTITY FULL;
+ INSERT INTO shipments
+ VALUES (default,10001,'Beijing','Shanghai',false),
+ (default,10002,'Hangzhou','Shanghai',false),
+ (default,10003,'Shanghai','Hangzhou',false);
+ ```
+
+## Starting Flink cluster and Flink SQL CLI
+
+1. Use the following command to change to the Flink directory:
+ ```
+ cd flink-1.13.2
+ ```
+2. Change the value of `taskmanager.numberOfTaskSlots` to 2 in `conf/flink-conf.yaml` for we will run two tasks at the same time.
+
+3. Use the following command to start a Flink cluster:
+ ```shell
+ ./bin/start-cluster.sh
+ ```
+ Then we can visit [http://localhost:8081/](http://localhost:8081/) to see if Flink is running normally, and the web page looks like:
+
+ 
+
+ Don’t forget to run the following command to stop the Flink cluster after finishing the tutorial:
+ ```shell
+ ./bin/stop-cluster.sh
+ ```
+
+4. Use the following command to start a Flink SQL CLI:
+ ```shell
+ ./bin/sql-client.sh
+ ```
+ We should see the welcome screen of the CLI client.
+
+ 
+
+## Creating tables using Flink DDL in Flink SQL CLI
+First, enable checkpoints every 3000 milliseconds
+```sql
+-- Flink SQL
+Flink SQL> SET execution.checkpointing.interval = 3s;
+```
+
+Then, create tables that capture the change data from the corresponding database tables.
+```sql
+-- Flink SQL
+Flink SQL> CREATE TABLE products (
+ id INT,
+ name STRING,
+ description STRING,
+ PRIMARY KEY (id) NOT ENFORCED
+ ) WITH (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = 'localhost',
+ 'port' = '3306',
+ 'username' = 'root',
+ 'password' = '123456',
+ 'database-name' = 'mydb',
+ 'table-name' = 'products'
+ );
+
+Flink SQL> CREATE TABLE orders (
+ order_id INT,
+ order_date TIMESTAMP(0),
+ customer_name STRING,
+ price DECIMAL(10, 5),
+ product_id INT,
+ order_status BOOLEAN,
+ PRIMARY KEY (order_id) NOT ENFORCED
+ ) WITH (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = 'localhost',
+ 'port' = '3306',
+ 'username' = 'root',
+ 'password' = '123456',
+ 'database-name' = 'mydb',
+ 'table-name' = 'orders'
+ );
+
+Flink SQL> CREATE TABLE shipments (
+ shipment_id INT,
+ order_id INT,
+ origin STRING,
+ destination STRING,
+ is_arrived BOOLEAN,
+ PRIMARY KEY (shipment_id) NOT ENFORCED
+ ) WITH (
+ 'connector' = 'postgres-cdc',
+ 'hostname' = 'localhost',
+ 'port' = '5432',
+ 'username' = 'postgres',
+ 'password' = 'postgres',
+ 'database-name' = 'postgres',
+ 'schema-name' = 'public',
+ 'table-name' = 'shipments'
+ );
+```
+
+Finally, create `enriched_orders` table that is used to load data to the Elasticsearch.
+```sql
+-- Flink SQL
+Flink SQL> CREATE TABLE enriched_orders (
+ order_id INT,
+ order_date TIMESTAMP(0),
+ customer_name STRING,
+ price DECIMAL(10, 5),
+ product_id INT,
+ order_status BOOLEAN,
+ product_name STRING,
+ product_description STRING,
+ shipment_id INT,
+ origin STRING,
+ destination STRING,
+ is_arrived BOOLEAN,
+ PRIMARY KEY (order_id) NOT ENFORCED
+ ) WITH (
+ 'connector' = 'elasticsearch-7',
+ 'hosts' = 'http://localhost:9200',
+ 'index' = 'enriched_orders'
+ );
+```
+
+## Enriching orders and load to ElasticSearch
+Use Flink SQL to join the `order` table with the `products` and `shipments` table to enrich orders and write to the Elasticsearch.
+```sql
+-- Flink SQL
+Flink SQL> INSERT INTO enriched_orders
+ SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
+ FROM orders AS o
+ LEFT JOIN products AS p ON o.product_id = p.id
+ LEFT JOIN shipments AS s ON o.order_id = s.order_id;
+```
+Now, the enriched orders should be shown in Kibana.
+Visit [http://localhost:5601/app/kibana#/management/kibana/index_pattern](http://localhost:5601/app/kibana#/management/kibana/index_pattern) to create an index pattern `enriched_orders`.
+
+
+
+Visit [http://localhost:5601/app/kibana#/discover](http://localhost:5601/app/kibana#/discover) to find the enriched orders.
+
+
+
+Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
+1. Insert a new order in MySQL
+ ```sql
+ --MySQL
+ INSERT INTO orders
+ VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
+ ```
+2. Insert a shipment in Postgres
+ ```sql
+ --PG
+ INSERT INTO shipments
+ VALUES (default,10004,'Shanghai','Beijing',false);
+ ```
+3. Update order status in MySQL
+ ```sql
+ --MySQL
+ UPDATE orders SET order_status = true WHERE order_id = 10004;
+ ```
+4. Update the shipment status in Postgres
+ ```sql
+ --PG
+ UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
+ ```
+5. Delete the shipment in Postgres
+ ```sql
+ --MySQL
+ DELETE FROM orders WHERE order_id = 10004;
+ ```
+ The changes of enriched orders in Kibana are as follows:
+ 
\ No newline at end of file
diff --git a/docs/content/tutorials/oracle-tutorial-zh.md b/docs/content/快速上手/oracle-tutorial-zh.md
similarity index 83%
rename from docs/content/tutorials/oracle-tutorial-zh.md
rename to docs/content/快速上手/oracle-tutorial-zh.md
index 46a34dac0..e188ecfed 100644
--- a/docs/content/tutorials/oracle-tutorial-zh.md
+++ b/docs/content/快速上手/oracle-tutorial-zh.md
@@ -1,6 +1,7 @@
-# Streaming ETL from Oracle to Elasticsearch(中文)
+# 演示: Oracle CDC 导入 Elasticsearch
+
+**1. 创建`docker-compose.yml`文件,内容如下所示:**
-1. 创建`docker-compose.yml`文件,内容如下所示:
```
version: '2.1'
services:
@@ -44,17 +45,20 @@ docker-compose up -d
该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。
你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。
另外可以通过如下命令停止所有的容器:
+
```shell
docker-compose down
````
-2. 下载以下 jar 包到 `/lib/`:
- ```下载链接只在已发布的版本上可用```
+**2. 下载以下 jar 包到 `/lib/`:**
+
+*下载链接只在已发布的版本上可用*
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.1-SNAPSHOT/flink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar)
-3. 然后启动 Flink 集群,再启动 SQL CLI:
+**3. 然后启动 Flink 集群,再启动 SQL CLI:**
+
```sql
-- Flink SQL
-- checkpoint every 3000 milliseconds
@@ -115,9 +119,15 @@ Flink SQL> INSERT INTO enriched_orders
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```
-4. 检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:8081/#/overview)看到ElasticSearch中的数据
-5. 进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新
+**4. 检查 ElasticSearch 中的结果**
+
+检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:8081/#/overview)看到ElasticSearch中的数据
+
+**5. 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果**
+
+进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。
+
```shell
docker-compose exec sqlplus flinkuser/flinkpw
```
diff --git a/docs/index.md b/docs/index.md
index 74dc98346..a0b7b2cca 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -4,9 +4,10 @@
:maxdepth: 2
:caption: Contents
content/about
+content/quickstart/index
+content/快速上手/index
content/connectors/index
content/formats/index
-content/tutorials/index
content/downloads
```