[docs] Translate "Streaming ETL for MySQL and Postgres with Flink CDC" into Chinese (#550)

pull/564/head
luoyuxia 3 years ago committed by GitHub
parent 7ef6424ee3
commit 02fd0c9c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -73,11 +73,6 @@ docker-compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly.
We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
Dont forget to run the following command to stop all containers after finishing the tutorial:
```shell
docker-compose down
```
### Preparing Flink and JAR package required
1. Download [Flink 1.13.2](https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz) and unzip it to the directory `flink-1.13.2`
2. Download following JAR package required and put them under `flink-1.13.2/lib/`:
@ -159,9 +154,8 @@ docker-compose down
```
cd flink-1.13.2
```
2. Change the value of `taskmanager.numberOfTaskSlots` to 2 in `conf/flink-conf.yaml` for we will run two tasks at the same time.
3. Use the following command to start a Flink cluster:
2. Use the following command to start a Flink cluster:
```shell
./bin/start-cluster.sh
```
@ -169,12 +163,7 @@ docker-compose down
![Flink UI](/_static/fig/mysql-postgress-tutorial/flink-ui.png "Flink UI")
Dont forget to run the following command to stop the Flink cluster after finishing the tutorial:
```shell
./bin/stop-cluster.sh
```
4. Use the following command to start a Flink SQL CLI:
3. Use the following command to start a Flink SQL CLI:
```shell
./bin/sql-client.sh
```
@ -183,7 +172,7 @@ docker-compose down
![Flink SQL Client](/_static/fig/mysql-postgress-tutorial/flink-sql-client.png "Flink SQL Client")
## Creating tables using Flink DDL in Flink SQL CLI
First, enable checkpoints every 3000 milliseconds
First, enable checkpoints every 3 seconds
```sql
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
@ -300,7 +289,7 @@ Next, do some change in the databases, and then the enriched orders shown in Kib
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
```
3. Update order status in MySQL
3. Update the order status in MySQL
```sql
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
@ -310,10 +299,20 @@ Next, do some change in the databases, and then the enriched orders shown in Kib
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
```
5. Delete the shipment in Postgres
5. Delete the order in MySQL
```sql
--MySQL
DELETE FROM orders WHERE order_id = 10004;
```
The changes of enriched orders in Kibana are as follows:
![Enriched Orders Changes](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders-changes.gif "Enriched Orders Changes")
![Enriched Orders Changes](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders-changes.gif "Enriched Orders Changes")
## Clean up
After finishing the tutorial, run the following command to stop all containers in the directory of `docker-compose.yml`:
```shell
docker-compose down
```
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.13.2`:
```shell
./bin/stop-cluster.sh
```

@ -1,23 +1,20 @@
# 基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
This tutorial is to show how to quickly build streaming ETL for MySQL and Postgres with Flink CDC.
这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL无需一行 Java/Scala 代码,也无需安装 IDE。
Assuming we are running an e-commerce business. The product and order data stored in MySQL, the shipment data related to the order is stored in Postgres.
We want to enrich the orders using the product and shipment table, and then load the enriched orders to ElasticSearch in real time.
假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。
对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。
In the following sections, we will describe how to use Flink Mysql/Postgres CDC to implement it.
All exercises in this tutorial are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.
The overview of the architecture is as follows:
接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:
![Flink CDC Streaming ETL](/_static/fig/mysql-postgress-tutorial/flink-cdc-streaming-etl.png "Flink CDC Streaming ETL")
## Preparation
Prepare a Linux or MacOS computer with Docker installed.
## 准备阶段
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
### Starting components required
The components required in this demo are all managed in containers, so we will use `docker-compose` to start them.
### 准备教程所需要的组件
接下来的教程将以 `docker-compose` 的方式准备所需要的组件。
Create `docker-compose.yml` file using following contents:
使用下面的内容创建一个 `docker-compose.yml` 文件:
```
version: '2.1'
services:
@ -60,40 +57,34 @@ services:
ports:
- "5601:5601"
```
The Docker Compose environment consists of the following containers:
- MySQL: the `products`,`orders` tables will be store in the database. They will be joined with data in Postgres to enrich the orders.
- Postgres: the `shipments` table will be store in the database.
- Elasticsearch: mainly used as a data sink to store enriched orders.
- Kibana: used to visualize the data in Elasticsearch.
该 Docker Compose 中包含的容器有:
- MySQL: 商品表 `products` 和 订单表 `orders` 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 `shipments`进行关联,得到一张包含更多信息的订单表 `enriched_orders`
- Postgres: 物流表 `shipments` 将存储在该数据库中
- Elasticsearch: 最终的订单表 `enriched_orders` 将写到 Elasticsearch
- Kibana: 用来可视化 ElasticSearch 的数据
To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
`docker-compose.yml` 所在目录下执行下面的命令来启动本教程需要的组件:
```shell
docker-compose up -d
```
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly.
We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kibana is running normally.
Dont forget to run the following command to stop all containers after finishing the tutorial:
```shell
docker-compose down
```
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 [http://localhost:5601/](http://localhost:5601/) 来查看 Kibana 是否运行正常。
### Preparing Flink and JAR package required
1. Download [Flink 1.13.2](https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz) and unzip it to the directory `flink-1.13.2`
2. Download following JAR package required and put them under `flink-1.13.2/lib/`:
### 下载 Flink 和所需要的依赖包
1. 下载 [Flink 1.13.2](https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz) 并将其解压至目录 `flink-1.13.2`
2. 下载下面列出的依赖包,并将它们放到目录 `flink-1.13.2/lib/` 下:
**Download links are available only for stable releases.**
**下载链接只在已发布的版本上可用**
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar)
- [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar)
### Preparing data in databases
#### Preparing data in MySQL
1. Enter mysql's container:
### 准备数据
#### 在 MySQL 数据库中准备数据
1. 进入 MySQL 容器
```shell
docker-compose exec mysql mysql -uroot -p123456
```
2. Create tables and populate data:
2. 创建数据库和表 `products``orders`,并插入数据
```sql
-- MySQL
CREATE DATABASE mydb;
@ -130,12 +121,12 @@ docker-compose down
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
```
#### Preparing data in Postgres
1. Enter postgres's container:
#### 在 Postgres 数据库中准备数据
1. 进入 Postgres 容器
```shell
docker-compose exec postgres psql -h localhost -U postgres
```
2. Create tables and populate data
2. 创建表 `shipments`,并插入数据
```sql
-- PG
CREATE TABLE shipments (
@ -153,43 +144,37 @@ docker-compose down
(default,10003,'Shanghai','Hangzhou',false);
```
## Starting Flink cluster and Flink SQL CLI
## 启动 Flink 集群和 Flink SQL CLI
1. Use the following command to change to the Flink directory:
1. 使用下面的命令跳转至 Flink 目录下
```
cd flink-1.13.2
```
2. Change the value of `taskmanager.numberOfTaskSlots` to 2 in `conf/flink-conf.yaml` for we will run two tasks at the same time.
3. Use the following command to start a Flink cluster:
2. 使用下面的命令启动 Flink 集群
```shell
./bin/start-cluster.sh
```
Then we can visit [http://localhost:8081/](http://localhost:8081/) to see if Flink is running normally, and the web page looks like:
启动成功的话,可以在 [http://localhost:8081/](http://localhost:8081/) 访问到 Flink Web UI如下所示
![Flink UI](/_static/fig/mysql-postgress-tutorial/flink-ui.png "Flink UI")
Dont forget to run the following command to stop the Flink cluster after finishing the tutorial:
```shell
./bin/stop-cluster.sh
```
4. Use the following command to start a Flink SQL CLI:
3. 使用下面的命令启动 Flink SQL CLI
```shell
./bin/sql-client.sh
```
We should see the welcome screen of the CLI client.
启动成功后,可以看到如下的页面:
![Flink SQL Client](/_static/fig/mysql-postgress-tutorial/flink-sql-client.png "Flink SQL Client")
![Flink SQL_Client](/_static/fig/mysql-postgress-tutorial/flink-sql-client.png "Flink SQL Client")
## Creating tables using Flink DDL in Flink SQL CLI
First, enable checkpoints every 3000 milliseconds
## 在 Flink SQL CLI 中使用 Flink DDL 创建表
首先,开启 checkpoint每隔3秒做一次 checkpoint
```sql
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
```
Then, create tables that capture the change data from the corresponding database tables.
然后, 对于数据库中的表 `products`, `orders`, `shipments` 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
```sql
-- Flink SQL
Flink SQL> CREATE TABLE products (
@ -244,7 +229,7 @@ Flink SQL> CREATE TABLE shipments (
);
```
Finally, create `enriched_orders` table that is used to load data to the Elasticsearch.
最后,创建 `enriched_orders` 表, 用来将关联后的订单数据写入 Elasticsearch 中
```sql
-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
@ -268,8 +253,8 @@ Flink SQL> CREATE TABLE enriched_orders (
);
```
## Enriching orders and load to ElasticSearch
Use Flink SQL to join the `order` table with the `products` and `shipments` table to enrich orders and write to the Elasticsearch.
## 关联订单数据并且将其写入 Elasticsearch 中
使用 Flink SQL 将订单表 `order` 与 商品表 `products`,物流信息表 `shipments` 关联,并将关联后的订单信息写入 Elasticsearch 中
```sql
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
@ -278,42 +263,55 @@ Flink SQL> INSERT INTO enriched_orders
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
```
Now, the enriched orders should be shown in Kibana.
Visit [http://localhost:5601/app/kibana#/management/kibana/index_pattern](http://localhost:5601/app/kibana#/management/kibana/index_pattern) to create an index pattern `enriched_orders`.
现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。
首先访问 [http://localhost:5601/app/kibana#/management/kibana/index_pattern](http://localhost:5601/app/kibana#/management/kibana/index_pattern) 创建 index pattern `enriched_orders`.
![Create Index Pattern](/_static/fig/mysql-postgress-tutorial/kibana-create-index-pattern.png "Create Index Pattern")
Visit [http://localhost:5601/app/kibana#/discover](http://localhost:5601/app/kibana#/discover) to find the enriched orders.
然后就可以在 [http://localhost:5601/app/kibana#/discover](http://localhost:5601/app/kibana#/discover) 看到写入的数据了.
![Find enriched Orders](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders.png "Find enriched Orders")
Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
1. Insert a new order in MySQL
接下来,修改 MySQL 和 Postgres 数据库中表的数据Kibana中显示的订单数据也将实时更新
1. 在 MySQL 的 `orders` 表中插入一条数据
```sql
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
```
2. Insert a shipment in Postgres
2. 在 Postgres 的 `shipment` 表中插入一条数据
```sql
--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
```
3. Update order status in MySQL
3. 在 MySQL 的 `orders` 表中更新订单的状态
```sql
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
```
4. Update the shipment status in Postgres
4. 在 Postgres 的 `shipment` 表中更新物流的状态
```sql
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
```
5. Delete the shipment in Postgres
5. 在 MYSQL 的 `orders` 表中删除一条数据
```sql
--MySQL
DELETE FROM orders WHERE order_id = 10004;
```
The changes of enriched orders in Kibana are as follows:
![Enriched Orders Changes](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders-changes.gif "Enriched Orders Changes")
每执行一步就刷新一次 Kibana可以看到 Kibana 中显示的订单数据将实时更新,如下所示:
![Enriched Orders Changes](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders-changes.gif "Enriched Orders Changes")
## 环境清理
本教程结束后,在 `docker-compose.yml` 文件所在的目录下执行如下命令停止所有容器:
```shell
docker-compose down
```
在 Flink 所在目录 `flink-1.13.2` 下执行如下命令停止 Flink 集群:
```shell
./bin/stop-cluster.sh
```

Loading…
Cancel
Save