[FLINK-34741][cdc][docs] Translate get-started page for Flink CDC doc to Chinese

This closes #3175.
pull/3188/head
Hongshun Wang 1 year ago committed by GitHub
parent bacb6a3b60
commit 43a5887cd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,28 +24,23 @@ specific language governing permissions and limitations
under the License.
-->
# Welcome to Flink CDC 🎉
# 欢迎使用 Flink CDC 🎉
Flink CDC is a streaming data integration tool that aims to provide users with
a more robust API. It allows users to describe their ETL pipeline logic via YAML
elegantly and help users automatically generating customized Flink operators and
submitting job. Flink CDC prioritizes optimizing the task submission process and
offers enhanced functionalities such as schema evolution, data transformation,
full database synchronization and exactly-once semantic.
Flink CDC 是一个基于流的数据集成工具旨在为用户提供一套功能更加全面的编程接口API
该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETLExtract, Transform, Load流程并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。
Flink CDC 在任务提交过程中进行了优化并且增加了一些高级特性如表结构变更自动同步Schema Evolution、数据转换Data Transformation、整库同步Full Database Synchronization以及 精确一次Exactly-once语义。
Deeply integrated with and powered by Apache Flink, Flink CDC provides:
Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:
* ✅ 端到端的数据集成框架
* ✅ 为数据集成的用户提供了易于构建作业的 API
* ✅ 支持在 Source 和 Sink 中处理多个表
* ✅ 整库同步
* ✅具备表结构变更自动同步的能力Schema Evolution
* ✅ End-to-end data integration framework
* ✅ API for data integration users to build jobs easily
* ✅ Multi-table support in Source / Sink
* ✅ Synchronization of entire databases
* ✅ Schema evolution capability
## 如何使用 Flink CDC
## How to Use Flink CDC
Flink CDC 提供了基于 `YAML` 格式的用户 API更适合于数据集成场景。以下是一个 `YAML` 文件的示例,它定义了一个数据管道(Pipeline)该Pipeline从 MySQL 捕获实时变更,并将它们同步到 Apache Doris
Flink CDC provides an YAML-formatted user API that more suitable for data
integration scenarios. Here's an example YAML file defining a data pipeline that
ingests real-time changes from MySQL, and synchronize them to Apache Doris:
```yaml
source:
@ -71,27 +66,23 @@ pipeline:
parallelism: 2
```
By submitting the YAML file with `flink-cdc.sh`, a Flink job will be compiled
and deployed to a designated Flink cluster. Please refer to [Core Concept]({{<
ref "docs/core-concept/data-pipeline" >}}) to get full documentation of all
supported functionalities of a pipeline.
通过使用 `flink-cdc.sh` 提交 YAML 文件,一个 Flink 作业将会被编译并部署到指定的 Flink 集群。
请参考 [核心概念]({{<ref "docs/core-concept/data-pipeline" >}}) 以获取 Pipeline 支持的所有功能的完整文档说明。
## Write Your First Flink CDC Pipeline
## 编写你的第一个 Flink CDC Pipeline
Explore Flink CDC document to get hands on your first real-time data integration
pipeline:
浏览 Flink CDC 文档,开始创建您的第一个实时数据集成管道(Pipeline)。
### Quickstart
### 快速开始
Check out the quickstart guide to learn how to establish a Flink CDC pipeline:
查看快速入门指南,了解如何建立一个 Flink CDC Pipeline
- [MySQL to Apache Doris]({{< ref "docs/get-started/quickstart/mysql-to-doris" >}})
- [MySQL to StarRocks]({{< ref "docs/get-started/quickstart/mysql-to-starrocks" >}})
### Understand Core Concepts
### 理解核心概念
Get familiar with core concepts we introduced in Flink CDC and try to build
more complex pipelines:
熟悉我们在 Flink CDC 中引入的核心概念并尝试构建更复杂的数据Pipeline
- [Data Pipeline]({{< ref "docs/core-concept/data-pipeline" >}})
- [Data Source]({{< ref "docs/core-concept/data-source" >}})
@ -100,21 +91,19 @@ more complex pipelines:
- [Transform]({{< ref "docs/core-concept/transform" >}})
- [Route]({{< ref "docs/core-concept/route" >}})
### Submit Pipeline to Flink Cluster
### 提交 Pipeline 到 Flink 集群
Learn how to submit the pipeline to Flink cluster running on different
deployment mode:
了解如何将 Pipeline 提交到运行在不同部署模式下的 Flink 集群:
- [standalone]({{< ref "docs/deployment/standalone" >}})
- [Kubernetes]({{< ref "docs/deployment/kubernetes" >}})
- [YARN]({{< ref "docs/deployment/yarn" >}})
## Development and Contribution
## 开发与贡献
如果您想要将 Flink CDC 连接到您定制化的外部系统,或者想要为框架本身做出贡献,以下这些部分可能会有所帮助:
If you want to connect Flink CDC to your customized external system, or
contributing to the framework itself, these sections could be helpful:
- [理解 Flink CDC API]({{< ref "docs/developer-guide/understand-flink-cdc-api" >}})开发您自己的Flink CDC 连接器。
- 了解如何[向 Flink CDC 提交贡献]({{< ref "docs/developer-guide/contribute-to-flink-cdc" >}})
- 查看 Flink CDC 使用的[许可证]({{< ref "docs/developer-guide/licenses" >}})
- Understand [Flink CDC APIs]({{< ref "docs/developer-guide/understand-flink-cdc-api" >}})
to develop your own Flink CDC connector
- Learn about how to [contributing to Flink CDC]({{< ref "docs/developer-guide/contribute-to-flink-cdc" >}})
- Check out [licenses]({{< ref "docs/developer-guide/licenses" >}}) used by Flink CDC

@ -24,66 +24,63 @@ specific language governing permissions and limitations
under the License.
-->
# Streaming ELT from MySQL to Doris
# Streaming ELT 同步 MySQL 到 Doris
This tutorial is to show how to quickly build a Streaming ELT job from MySQL to Doris using Flink CDC, including the
feature of sync all table of one database, schema change evolution and sync sharding tables into one table.
All exercises in this tutorial are performed in the Flink CDC CLI, and the entire process uses standard SQL syntax,
without a single line of Java/Scala code or IDE installation.
这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Doris 的 Streaming ELT 作业,包含整库同步、表结构变更同步和分库分表同步的功能。
本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。
## Preparation
Prepare a Linux or MacOS computer with Docker installed.
## 准备阶段
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
### Prepare Flink Standalone cluster
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) unzip and get flink-1.18.0 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located.
### 准备 Flink Standalone 集群
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz),解压后得到 flink-1.18.0 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
```shell
cd flink-1.18.0
```
2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。
```yaml
execution.checkpointing.interval: 3000
```
3. Start the Flink cluster using the following command.
3. 使用下面的命令启动 Flink 集群。
```shell
./bin/start-cluster.sh
```
If successfully started, you can access the Flink Web UI at [http://localhost:8081/](http://localhost:8081/), as shown below.
启动成功的话,可以在 [http://localhost:8081/](http://localhost:8081/)访问到 Flink Web UI如下所示
{{< img src="/fig/mysql-doris-tutorial/flink-ui.png" alt="Flink UI" >}}
Executing `start-cluster.sh` multiple times can start multiple `TaskManager`s.
多次执行 `start-cluster.sh` 可以拉起多个 TaskManager。
### Prepare docker compose
The following tutorial will prepare the required components using `docker-compose`.
### 准备 Docker 环境
接下来的教程将以 `docker-compose` 的方式准备所需要的组件。
1. Host Machine Configuration
Since `Doris` requires memory mapping support for operation, execute the following command on the host machine:
1. 宿主机配置
由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令:
```shell
sysctl -w vm.max_map_count=2000000
```
Due to the different ways of implementing containers internally on MacOS, it may not be possible to directly modify the value of max_map_count on the host during deployment. You need to create the following containers first:
MacOS 由于内部实现容器的方式不同在部署时宿主机直接修改max_map_count值可能无法成功需要先创建以下容器
```shell
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
```
The container was created successfully executing the following command:
容器创建成功执行以下命令:
```shell
sysctl -w vm.max_map_count=2000000
```
Then `exit` exits and creates the Doris Docker cluster.
然后 `exit` 退出,创建 Doris Docker 集群。
2. Start docker compose
Create a `docker-compose.yml` file using the content provided below:
2. docker 镜像启动
使用下面的内容创建一个 `docker-compose.yml` 文件:
```yaml
version: '2.1'
@ -104,76 +101,77 @@ Then `exit` exits and creates the Doris Docker cluster.
- MYSQL_PASSWORD=mysqlpw
```
The Docker Compose should include the following services (containers):
- MySQL: include a database named `app_db`
- Doris: to store tables from MySQL
该 Docker Compose 中包含的容器有:
- MySQL: 包含商品信息的数据库 `app_db`
- Doris: 存储从 MySQL 中根据规则映射过来的结果表
To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
`docker-compose.yml` 所在目录下执行下面的命令来启动本教程需要的组件:
```shell
docker-compose up -d
```
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly. You can also visit [http://localhost:8030/](http://localhost:8030/) to check whether Doris is running.
#### Prepare records for MySQL
1. Enter MySQL container
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问[http://localhost:8030/](http://localhost:8030/) 来查看 Doris 是否运行正常。
#### 在 MySQL 数据库中准备数据
1. 进入 MySQL 容器
```shell
docker-compose exec mysql mysql -uroot -p123456
```
2. create `app_db` database and `orders`,`products`,`shipments` tables, then insert records
2. 创建数据库 `app_db` 和表 `orders`,`products`,`shipments`,并插入数据
```sql
-- create database
-- 创建数据库
CREATE DATABASE app_db;
USE app_db;
-- create orders table
-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
-- insert records
-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
-- create shipments table
-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- insert records
-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
-- create products table
-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- insert records
-- 插入数据
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
```
#### Create database in Doris
`Doris` connector currently does not support automatic database creation and needs to first create a database corresponding to the write table.
1. Enter Doris Web UI。
`Doris` 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
1. 进入 Doris Web UI。
[http://localhost:8030/](http://localhost:8030/)
The default username is `root`, and the default password is empty.
默认的用户名为 `root`,默认密码为空。
{{< img src="/fig/mysql-doris-tutorial/doris-ui.png" alt="Doris UI" >}}
2. Create `app_db` database through Web UI.
2. 通过 Web UI 创建 `app_db` 数据库
```sql
create database app_db;
@ -181,18 +179,18 @@ This command automatically starts all the containers defined in the Docker Compo
{{< img src="/fig/mysql-doris-tutorial/doris-create-table.png" alt="Doris create table" >}}
## Submit job using FlinkCDC cli
1. Download the binary compressed packages listed below and extract them to the directory ` flink cdc-3.0.0 '`
[flink-cdc-3.0.0-bin.tar.gz](https://github.org/apache/flink/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
flink-cdc-3.0.0 directory will contain four directory `bin`,`lib`,`log`,`conf`.
## 通过 FlinkCDC cli 提交任务
1. 下载下面列出的二进制压缩包,并解压得到目录 ` flink cdc-3.0.0 '`
[flink-cdc-3.0.0-bin.tar.gz](https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz).
flink-cdc-3.0.0 下会包含 `bin`、`lib`、`log`、`conf` 四个目录。
2. Download the connector package listed below and move it to the `lib` directory
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.**
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
2. 下载下面列出的 connector 包,并且移动到 `lib` 目录下
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.**
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
3. Write task configuration yaml file
Here is an example file for synchronizing the entire database `mysql-to-doris.yaml`
3.编写任务配置 yaml 文件
下面给出了一个整库同步的示例文件 `mysql-to-doris.yaml`
```yaml
################################################################################
@ -221,70 +219,69 @@ This command automatically starts all the containers defined in the Docker Compo
parallelism: 2
```
Notice that:
`tables: app_db.\.*` in source synchronize all tables in `app_db` through Regular Matching.
`table.create.properties.replication_num` in sink is because there is only one Doris BE node in the Docker image.
4. Finally, submit job to Flink Standalone cluster using Cli.
其中:
source 中的 `tables: app_db.\.*` 通过正则匹配同步 `app_db` 下的所有表。
sink 添加 `table.create.properties.replication_num` 参数是由于 Docker 镜像中只有一个 Doris BE 节点。
4. 最后,通过命令行提交任务到 Flink Standalone cluster
```shell
bash bin/flink-cdc.sh mysql-to-doris.yaml
```
After successful submission, the return information is as follows
提交成功后,返回信息如
```shell
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
```
We can find a job named `Sync MySQL Database to Doris` is running through Flink Web UI.
在 Flink Web UI可以看到一个名为 `Sync MySQL Database to Doris` 的任务正在运行。
{{< img src="/fig/mysql-doris-tutorial/mysql-to-doris.png" alt="MySQL-to-Doris" >}}
We can find that tables are created and inserted through Doris Web UI.
打开 Doris 的 Web UI可以看到数据表已经被创建出来数据能成功写入。
{{< img src="/fig/mysql-doris-tutorial/doris-display-data.png" alt="Doris display data" >}}
### Synchronize Schema and Data changes
Enter MySQL container
### 同步变更
进入 MySQL 容器
```shell
docker-compose exec mysql mysql -uroot -p123456
```
Then, modify schema and record in MySQL, and the tables of Doris will change the same in real time
1. insert one record in `orders` from MySQL:
接下来,修改 MySQL 数据库中表的数据Doris 中显示的订单数据也将实时更新
1. 在 MySQL 的 `orders` 表中插入一条数据
```sql
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
```
2. add one column in `orders` from MySQL:
2. 在 MySQL 的 `orders` 表中增加一个字段
```sql
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
```
3. update one record in `orders` from MySQL:
3. 在 MySQL 的 `orders` 表中更新一条数据
```sql
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
```
4. delete one record in `orders` from MySQL:
4. 在 MySQL 的 `orders` 表中删除一条数据
```sql
DELETE FROM app_db.orders WHERE id=2;
```
Refresh the Doris Web UI every time you execute a step, and you can see that the `orders` table displayed in Doris will be updated in real-time, like the following
每执行一步就刷新一次 Doris Web UI可以看到 Doris 中显示的 orders 数据将实时更新,如下所示
{{< img src="/fig/mysql-doris-tutorial/doris-display-result.png" alt="Doris display result" >}}
Similarly, by modifying the 'shipments' and' products' tables, you can also see the results of synchronized changes in real-time in Doris.
同样的,去修改 `shipments`, `products` 表,也能在 Doris 中实时看到同步变更的结果。
### Route the changes
Flink CDC provides the configuration to route the table structure/data of the source table to other table names.
With this ability, we can achieve functions such as table name, database name replacement, and whole database synchronization.
Here is an example file for using `route` feature:
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件说明:
```yaml
################################################################################
# Description: Sync MySQL all tables to Doris
@ -321,8 +318,8 @@ Here is an example file for using `route` feature:
parallelism: 2
```
Using the upper `route` configuration, we can synchronize the table schema and data of `app_db.orders` to `ods_db.ods_orders`, thus achieving the function of database migration.
Specifically, `source-table` support regular expression matching with multiple tables to synchronize sharding databases and tables. like the following
通过上面的 `route` 配置,会将 `app_db.orders` 表的结构和数据同步到 `ods_db.ods_orders` 中。从而实现数据库迁移的功能。
特别地,`source-table` 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置
```yaml
route:
@ -330,16 +327,14 @@ Specifically, `source-table` support regular expression matching with multiple t
sink-table: ods_db.ods_orders
```
In this way, we can synchronize sharding tables like `app_db.order01`、`app_db.order02`、`app_db.order03` into one ods_db.ods_orders tables.
Warning that there is currently no support for scenarios where the same primary key data exists in multiple tables, which will be supported in future versions.
## Clean up
After finishing the tutorial, run the following command to stop all containers in the directory of `docker-compose.yml`:
这样,就可以将诸如 `app_db.order01`、`app_db.order02`、`app_db.order03` 的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
## 环境清理
本教程结束后,在 `docker-compose.yml` 文件所在的目录下执行如下命令停止所有容器:
```shell
docker-compose down
```
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`:
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群:
```shell
./bin/stop-cluster.sh

@ -24,138 +24,135 @@ specific language governing permissions and limitations
under the License.
-->
# Streaming ELT from MySQL to StarRocks
# Streaming ELT 同步 MySQL 到 StarRocks
This tutorial is to show how to quickly build a Streaming ELT job from MySQL to StarRocks using Flink CDC, including the
feature of sync all table of one database, schema change evolution and sync sharding tables into one table.
All exercises in this tutorial are performed in the Flink CDC CLI, and the entire process uses standard SQL syntax,
without a single line of Java/Scala code or IDE installation.
这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 StarRocks 的 Streaming ELT 作业,包含整库同步、表结构变更同步和分库分表同步的功能。
本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。
## Preparation
Prepare a Linux or MacOS computer with Docker installed.
## 准备阶段
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
### Prepare Flink Standalone cluster
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) unzip and get flink-1.18.0 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located.
### 准备 Flink Standalone 集群
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) 解压后得到 flink-1.18.0 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
```shell
cd flink-1.18.0
```
2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。
```yaml
execution.checkpointing.interval: 3000
```
3. Start the Flink cluster using the following command.
3. 使用下面的命令启动 Flink 集群。
```shell
./bin/start-cluster.sh
```
If successfully started, you can access the Flink Web UI at [http://localhost:8081/](http://localhost:8081/), as shown below.
启动成功的话,可以在 [http://localhost:8081/](http://localhost:8081/) 访问到 Flink Web UI如下所示
{{< img src="/fig/mysql-starrocks-tutorial/flink-ui.png" alt="Flink UI" >}}
Executing `start-cluster.sh` multiple times can start multiple `TaskManager`s.
多次执行 start-cluster.sh 可以拉起多个 TaskManager。
### Prepare docker compose
The following tutorial will prepare the required components using `docker-compose`.
Create a `docker-compose.yml` file using the content provided below:
### 准备 Docker 环境
使用下面的内容创建一个 `docker-compose.yml` 文件:
```yaml
version: '2.1'
services:
StarRocks:
image: registry.starrocks.io/starrocks/allin1-ubuntu
ports:
- "8030:8030"
- "8040:8040"
- "9030:9030"
MySQL:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
StarRocks:
image: registry.starrocks.io/starrocks/allin1-ubuntu
ports:
- "8030:8030"
- "8040:8040"
- "9030:9030"
MySQL:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
```
The Docker Compose should include the following services (containers):
- MySQL: include a database named `app_db`
- StarRocks: to store tables from MySQL
该 Docker Compose 中包含的容器有:
- MySQL: 包含商品信息的数据库 `app_db`
- StarRocks: 存储从 MySQL 中根据规则映射过来的结果表
To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
`docker-compose.yml` 所在目录下执行下面的命令来启动本教程需要的组件:
```shell
docker-compose up -d
```
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly. You can also visit [http://localhost:8030/](http://localhost:8030/) to check whether StarRocks is running.
#### Prepare records for MySQL
1. Enter MySQL container
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 [http://localhost:8030/](http://localhost:8030/) 来查看 StarRocks 是否运行正常。
#### 在 MySQL 数据库中准备数据
1. 进入 MySQL 容器
```shell
docker-compose exec mysql mysql -uroot -p123456
```
2. create `app_db` database and `orders`,`products`,`shipments` tables, then insert records
2. 创建数据库 `app_db` 和表 `orders`,`products`,`shipments`,并插入数据
```sql
-- create database
-- 创建数据库
CREATE DATABASE app_db;
USE app_db;
-- create orders table
-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
-- insert records
-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
-- create shipments table
-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- insert records
-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
-- create products table
-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
-- insert records
-- 插入数据
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
```
## Submit job using FlinkCDC cli
1. Download the binary compressed packages listed below and extract them to the directory ` flink cdc-3.0.0 '`
[flink-cdc-3.0.0-bin.tar.gz](https://github.org/apache/flink/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
flink-cdc-3.0.0 directory will contain four directory `bin`,`lib`,`log`,`conf`.
2. Download the connector package listed below and move it to the `lib` directory
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.**
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [StarRocks pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar)
## 通过 FlinkCDC cli 提交任务
1. 下载下面列出的二进制压缩包,并解压得到目录 `flink-cdc-3.0.0`
[flink-cdc-3.0.0-bin.tar.gz](https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。
2. 下载下面列出的 connector 包,并且移动到 lib 目录下
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译**
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [StarRocks pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar)
3. Write task configuration yaml file.
Here is an example file for synchronizing the entire database `mysql-to-starrocks.yaml`
3. 编写任务配置 yaml 文件
下面给出了一个整库同步的示例文件 mysql-to-starrocks.yaml
```yaml
################################################################################
@ -186,17 +183,17 @@ This command automatically starts all the containers defined in the Docker Compo
```
Notice that:
* `tables: app_db.\.*` in source synchronize all tables in `app_db` through Regular Matching.
* `table.create.properties.replication_num` in sink is because there is only one StarRocks BE node in the Docker image.
其中:
* source 中的 `tables: app_db.\.*` 通过正则匹配同步 `app_db` 下的所有表。
* sink 添加 `table.create.properties.replication_num` 参数是由于 Docker 镜像中只有一个 StarRocks BE 节点。
4. Finally, submit job to Flink Standalone cluster using Cli.
4. 最后,通过命令行提交任务到 Flink Standalone cluster
```shell
bash bin/flink-cdc.sh mysql-to-starrocks.yaml
```
After successful submission, the return information is as follows
提交成功后,返回信息如
```shell
Pipeline has been submitted to cluster.
@ -204,71 +201,71 @@ After successful submission, the return information is as follows
Job Description: Sync MySQL Database to StarRocks
```
We can find a job named `Sync MySQL Database to StarRocks` is running through Flink Web UI.
在 Flink Web UI可以看到一个名为 `Sync MySQL Database to StarRocks` 的任务正在运行。
{{< img src="/fig/mysql-starrocks-tutorial/mysql-to-starrocks.png" alt="MySQL-to-StarRocks" >}}
Connect to jdbc through database connection tools such as Dbeaver using `mysql://127.0.0.1:9030`. You can view the data written to three tables in StarRocks.
通过数据库连接工具例如 Dbeaver 等连接到 jdbc:mysql://127.0.0.1:9030 可以查看 StarRocks 中写入了三张表的数据。
{{< img src="/fig/mysql-starrocks-tutorial/starrocks-display-data.png" alt="StarRocks-display-data" >}}
### Synchronize Schema and Data changes
Enter MySQL container
### 同步变更
进入 MySQL 容器:
```shell
docker-compose exec mysql mysql -uroot -p123456
```
```shell
docker-compose exec mysql mysql -uroot -p123456
```
Then, modify schema and record in MySQL, and the tables of StarRocks will change the same in real time
1. insert one record in `orders` from MySQL:
接下来,修改 MySQL 数据库中表的数据StarRocks 中显示的订单数据也将实时更新
1. 在 MySQL 的 `orders` 表中插入一条数据
```sql
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
```
2. add one column in `orders` from MySQL:
2. 在 MySQL 的 `orders` 表中增加一个字段
```sql
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
```
3. update one record in `orders` from MySQL:
3. 在 MySQL 的 `orders` 表中更新一条数据
```sql
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
```
4. delete one record in `orders` from MySQL:
4. 在 MySQL 的 `orders` 表中删除一条数据
```sql
DELETE FROM app_db.orders WHERE id=2;
```
Refresh the Dbeaver every time you execute a step, and you can see that the `orders` table displayed in StarRocks will be updated in real-time, like the following
通过连接工具,我们可以看到 StarRocks 上也在实时发生着这些变更:
{{< img src="/fig/mysql-starrocks-tutorial/starrocks-display-result.png" alt="StarRocks-display-result" >}}
Similarly, by modifying the `shipments` and `products` tables, you can also see the results of synchronized changes in real-time in StarRocks.
同样的,去修改 `shipments`, `products` 表,也能在 StarRocks 中实时看到同步变更的结果。
### Route the changes
Flink CDC provides the configuration to route the table structure/data of the source table to other table names.
With this ability, we can achieve functions such as table name, database name replacement, and whole database synchronization.
Here is an example file for using `route` feature:
### 路由变更
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件说明:
```yaml
################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
@ -276,38 +273,37 @@ Here is an example file for using `route` feature:
table.create.properties.replication_num: 1
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 2
name: Sync MySQL Database to StarRocks
parallelism: 2
```
Using the upper `route` configuration, we can synchronize the table schema and data of `app_db.orders` to `ods_db.ods_orders`, thus achieving the function of database migration.
Specifically, `source-table` support regular expression matching with multiple tables to synchronize sharding databases and tables. like the following
通过上面的 `route` 配置,会将 `app_db.orders` 表的结构和数据同步到 `ods_db.ods_orders` 中。从而实现数据库迁移的功能。
特别地source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置
```yaml
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
```
In this way, we can synchronize sharding tables like `app_db.order01`、`app_db.order02`、`app_db.order03` into one ods_db.ods_orders tables.
Warning that there is currently no support for scenarios where the same primary key data exists in multiple tables, which will be supported in future versions.
这样,就可以将诸如 `app_db.order01`、`app_db.order02`、`app_db.order03` 的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
## Clean up
After finishing the tutorial, run the following command to stop all containers in the directory of `docker-compose.yml`:
## 环境清理
本教程结束后,在 `docker-compose.yml` 文件所在的目录下执行如下命令停止所有容器:
```shell
docker-compose down
```
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`:
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群:
```shell
./bin/stop-cluster.sh

@ -183,13 +183,13 @@ This command automatically starts all the containers defined in the Docker Compo
## Submit job using FlinkCDC cli
1. Download the binary compressed packages listed below and extract them to the directory ` flink cdc-3.0.0 '`
[flink-cdc-3.0.0-bin.tar.gz](https://github.org/apache/flink/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
[flink-cdc-3.0.0-bin.tar.gz](https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
flink-cdc-3.0.0 directory will contain four directory `bin`,`lib`,`log`,`conf`.
2. Download the connector package listed below and move it to the `lib` directory
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.**
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
3. Write task configuration yaml file
Here is an example file for synchronizing the entire database `mysql-to-doris.yaml`

@ -146,13 +146,13 @@ This command automatically starts all the containers defined in the Docker Compo
## Submit job using FlinkCDC cli
1. Download the binary compressed packages listed below and extract them to the directory ` flink cdc-3.0.0 '`
[flink-cdc-3.0.0-bin.tar.gz](https://github.org/apache/flink/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
[flink-cdc-3.0.0-bin.tar.gz](https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz)
flink-cdc-3.0.0 directory will contain four directory `bin`,`lib`,`log`,`conf`.
2. Download the connector package listed below and move it to the `lib` directory
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.**
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [StarRocks pipeline connector 3.0.0](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar)
- [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [StarRocks pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar)
3. Write task configuration yaml file.
Here is an example file for synchronizing the entire database `mysql-to-starrocks.yaml`

Loading…
Cancel
Save