You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
flink-cdc-connectors/docs/content/connectors/oceanbase-cdc(ZH).md

238 lines
12 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# OceanBase CDC 连接器
OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 OceanBase 进行 SQL 查询。
## 依赖
为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT构建的项目和带有 SQL JAR 包的 SQL 客户端。
```xml
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<!-- 请使用已发布的版本依赖snapshot 版本的依赖需要本地自行编译。 -->
<version>2.3.0</version>
</dependency>
```
## 下载 SQL 客户端 JAR 包
点击 [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3.0/flink-sql-connector-oceanbase-cdc-2.3.0.jar) 下载 JAR 包至 `<FLINK_HOME>/lib/`.
> **说明:**
>
> 下载链接仅适用于稳定发行版本。
`flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT` 快照版本与开发分支的版本对应。要使用快照版本,您必须自行下载并编译源代码。推荐使用稳定发行版本,例如 `flink-sql-connector-oceanbase-cdc-2.3.0.jar`。您可以在 Maven 中央仓库中找到使用稳定发行版本。
### 配置 OceanBase 数据库和 oblogproxy 服务
1. 按照 [部署文档](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/deploy-the-distributed-oceanbase-cluster) 配置 OceanBase 集群。
2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。更多信息,参考 [用户管理文档](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/create-user-3)。
```bash
mysql -h${host} -P${port} -uroot
mysql> SHOW TENANT;
mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
```
3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。
4. 使用以下命令,获取 `rootservice_list` 的值。
```bash
mysql> SHOW PARAMETERS LIKE 'rootservice_list';
```
5. 按照 [快速入门 文档](https://github.com/oceanbase/oblogproxy#quick-start) 配置 oblogproxy。
## 创建 OceanBase CDC 表
使用以下命令,创建 OceanBase CDC 表:
```sql
-- 每 3 秒做一次 checkpoint用于测试生产配置建议 5 到 10 分钟
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- 在 Flink SQL 中创建 OceanBase 表 `orders`
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983');
-- 从表 orders 中读取快照数据和 binlog 数据
Flink SQL> SELECT * FROM orders;
```
您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息参考 [Flink CDC 官网文档](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/oceanbase-tutorial-zh.html)。
## OceanBase CDC 连接器选项
配置项 | 是否必选 | 默认值 | 类型 | 描述
---- | ----- | ------ | ----- | ----
connector | 是 | 无 | String | 指定要使用的连接器。此处为 `oceanbase-cdc`
scan.startup.mode | 是 | 无 | String | 指定 OceanBase CDC 消费者的启动模式。可取值为 `initial`、`latest-offset` 或`timestamp`。
scan.startup.timestamp | 否 | 无 | Long | 起始点的时间戳,单位为秒。仅在 `scan.startup.mode` 的值为 `timestamp` 时适用。
username | 是 | 无 | String | 连接 OceanBase 数据库的用户的名称。
password | 是 | 无 | String | 连接 OceanBase 数据库时使用的密码。
tenant-name | 是 | 无 | String | 待监控 OceanBase 数据库的租户名,应该填入精确值。
database-name | 是 | 无 | String | 待监控 OceanBase 数据库的数据库名。
table-name | 是 | 无 | String | 待监控 OceanBase 数据库的表名。
hostname | 否 | 无 | String | OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。
port | 否 | 无 | String | Integer | OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 `2881`)或 ODP 的端口号(默认值为 `2883`)。
connect.timeout | 否 | 30s | Duration | 连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。
server-time-zone | 否 | UTC | String | 数据库服务器中的会话时区,例如 `"Asia/Shanghai"`。此选项控制 OceanBase 数据库中的 `TIMESTAMP` 类型在快照读取时如何转换为`STRING`。确保此选项与 `oblogproxy` 的时区设置相同。
rootserver-list | 是 | 无 | String | OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,多个服务器地址使用英文分号 `;` 隔开。
logproxy.host | 是 | 无 | String | oblogproxy 的 IP 地址或主机名。
logproxy.port | 是 | 无 | Integer | oblogproxy 的端口号。
## 支持的元数据
在创建表时您可以使用以下格式的元数据作为只读列VIRTUAL
列名 | 数据类型 | 描述
----- | ----- | -----
tenant_name | STRING NOT NULL | 当前记录所属的租户名称。
database_name | STRING NOT NULL | 当前记录所属的数据库名称。
table_name | STRING NOT NULL | 当前记录所属的表名称。
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该值表示此修改在数据库中发生的时间。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。
如下 SQL 展示了如何在表中使用这些元数据列:
```sql
CREATE TABLE products (
tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983');
```
## 特性
### At-Least-Once 处理
OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,然后再读取变化事件,并进行 **At-Least-Once 处理**
OceanBase 数据库是一个分布式数据库,它的日志也分散在不同的服务器上。由于没有类似 MySQL binlog 偏移量的位置信息OceanBase 数据库用时间戳作为位置标记。为确保读取完整的数据liboblog读取 OceanBase 日志记录的 C++ 库可能会在给定的时间戳之前读取一些日志数据。因此OceanBase 数据库可能会读到起始点附近时间戳的重复数据,可保证 **At-Least-Once 处理**
### 启动模式
配置选项 `scan.startup.mode` 指定 OceanBase CDC 连接器的启动模式。可用取值包括:
- `initial`(默认):在首次启动时对受监视的数据库表执行初始快照,并继续读取最新的提交日志。
- `latest-offset`:首次启动时,不对受监视的数据库表执行快照,仅从连接器启动时读取提交日志。
- `timestamp`:在首次启动时不对受监视的数据库表执行初始快照,仅从指定的 `scan.startup.timestamp` 读取最新的提交日志。
### 消费提交日志
OceanBase CDC 连接器使用 [oblogclient](https://github.com/oceanbase/oblogclient) 消费 oblogproxy 中的事务日志。
### DataStream Source
OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以按照如下创建一个 SourceFunction
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<String>builder()
.rsList("127.0.0.1:2882:2881") // 设置 rs 列表
.startupMode(StartupMode.INITIAL) // 设置 startup 模式
.username("user@test_tenant") // 设置集群用户名
.password("pswd") // 设置集群密码
.tenantName("test_tenant") // 设置捕获租户名,不支持正则表达式
.databaseName("test_db") // 设置捕获数据库,支持正则表达式
.tableName("test_table") // 设置捕获表,支持正则表达式
.hostname("127.0.0.1") // 设置 OceanBase 服务器或代理的 hostname
.port(2881) // 设置 OceanBase 服务器或代理的 SQL 端口
.logProxyHost("127.0.0.1") // 设置 log proxy 的 hostname
.logProxyPort(2983) // 设置 log proxy 的商品
.deserializer(new JsonDebeziumDeserializationSchema()) // 把 SourceRecord 转化成 JSON 字符串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Commit Log");
}
}
```
## 数据类型映射
当启动模式不是 `INITIAL` 时,连接器无法获得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的FLink 类型。例如,`BOOLEAN`、`TINYINT(1)` 或 `BIT(1)` 均可以转换成 `BOOLEAN`。在 OceanBase 数据库中,`BOOLEAN` 等同于 `TINYINT(1)`,所以 `BOOLEAN``TINYINT` 类型的列在 Flink 中会被映射为 `TINYINT`,而 `BIT(1)` 在 Flink 中会被映射为 `BINARY(1)`
OceanBase 数据类型 | Flink SQL 类型 | 描述
----- | ----- | ------
BOOLEAN <br>TINYINT | TINYINT |
SMALLINT<br>TINYINT <br>UNSIGNED | SMALLINT |
INT<br>MEDIUMINT<br>SMALLINT<br>UNSIGNED | INT |
BIGINT<br>INT UNSIGNED | BIGINT |
BIGINT UNSIGNED | DECIMAL(20, 0) |
REAL FLOAT | FLOAT |
DOUBLE | DOUBLE |
NUMERIC(p, s)<br>DECIMAL(p, s)<br>where p <= 38 | DECIMAL(p, s) |
NUMERIC(p, s)<br>DECIMAL(p, s)<br>where 38 < p <=65 | STRING | DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。但在 Flink 中,DECIMAL 的最高精度为 38。因此,如果你定义了一个精度大于 38 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。 |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
TIMESTAMP [(p)]<br>DATETIME [(p)] | TIMESTAMP [(p)] |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈n/8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
TINYTEXT<br>TEXT<br>MEDIUMTEXT<br>LONGTEXT | STRING |
TINYBLOB<br>BLOB<br>MEDIUMBLOB<br>LONGBLOB | BYTES |
YEAR | INT |
ENUM | STRING |
SET | STRING