diff --git a/docs/content/connectors/oceanbase-cdc(ZH).md b/docs/content/connectors/oceanbase-cdc(ZH).md index 6266e433d..7cb220f64 100644 --- a/docs/content/connectors/oceanbase-cdc(ZH).md +++ b/docs/content/connectors/oceanbase-cdc(ZH).md @@ -17,13 +17,11 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。 ## 下载 SQL 客户端 JAR 包 -点击 [flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.4-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar) 下载 JAR 包至 `/lib/`. +```下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。``` -> **说明:** -> -> 下载链接仅适用于稳定发行版本。 +下载[flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.4-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar) 到 `/lib/` 目录下。 -`flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT` 快照版本与开发分支的版本对应。要使用快照版本,您必须自行下载并编译源代码。推荐使用稳定发行版本,例如 `flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar`。您可以在 Maven 中央仓库中找到使用稳定发行版本。 +**注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。 ### 配置 OceanBase 数据库和 oblogproxy 服务 @@ -38,11 +36,16 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。 ``` 3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。 -4. 使用以下命令,获取 `rootservice_list` 的值。 +4. OceanBase 社区版用户需要获取`rootserver-list`,可以使用以下命令获取: ```bash mysql> SHOW PARAMETERS LIKE 'rootservice_list'; ``` + OceanBase 企业版用户需要获取 `config-url`,可以使用以下命令获取: + + ```shell + mysql> show parameters like 'obconfig_url'; + ``` 5. 按照 [快速入门 文档](https://github.com/oceanbase/oblogproxy#quick-start) 配置 oblogproxy。 @@ -75,7 +78,9 @@ Flink SQL> CREATE TABLE orders ( 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', 'logproxy.host' = '127.0.0.1', - 'logproxy.port' = '2983'); + 'logproxy.port' = '2983', + 'working-mode' = 'memory' +); -- 从表 orders 中读取快照数据和 binlog 数据 Flink SQL> SELECT * FROM orders; @@ -85,6 +90,13 @@ Flink SQL> SELECT * FROM orders; ## OceanBase CDC 连接器选项 +OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表所示。 + +*注意*:连接器支持两种方式来指定需要监听的表,两种方式同时使用时会监听两种方式匹配的所有表。 +1. 使用 `database-name` 和 `table-name` 匹配正则表达式中的数据库和表名。 由于`obcdc`(以前的`liboblog`)现在只支持`fnmatch`匹配,我们不能直接使用正则过滤 changelog 事件,所以通过两个选项去匹配去指定监听表只能在`initial`启动模式下使用。 +2. 使用 `table-list` 去匹配数据库名和表名的准确列表。 + + 配置项 | 是否必选 | 默认值 | 类型 | 描述 ---- | ----- | ------ | ----- | ---- connector | 是 | 无 | String | 指定要使用的连接器。此处为 `oceanbase-cdc`。 @@ -93,15 +105,19 @@ scan.startup.timestamp | 否 | 无 | Long | 起始点的时间戳,单位为秒 username | 是 | 无 | String | 连接 OceanBase 数据库的用户的名称。 password | 是 | 无 | String | 连接 OceanBase 数据库时使用的密码。 tenant-name | 是 | 无 | String | 待监控 OceanBase 数据库的租户名,应该填入精确值。 -database-name | 是 | 无 | String | 待监控 OceanBase 数据库的数据库名。 -table-name | 是 | 无 | String | 待监控 OceanBase 数据库的表名。 +database-name | 是 | 无 | String | 待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。 +table-name | 否 | 无 | String | 待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。 +table-list | 否 | 无 | String | 待监控 OceanBase 数据库的全路径的表名列表,逗号分隔,如:"db1.table1, db2.table2"。 hostname | 否 | 无 | String | OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。 -port | 否 | 无 | String | Integer | OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 `2881`)或 ODP 的端口号(默认值为 `2883`)。 +port | 否 | 无 | Integer | OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 2881)
或 OceanBase代理服务的端口号(默认值为 2883)。 connect.timeout | 否 | 30s | Duration | 连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。 -server-time-zone | 否 | UTC | String | 数据库服务器中的会话时区,例如 `"Asia/Shanghai"`。此选项控制 OceanBase 数据库中的 `TIMESTAMP` 类型在快照读取时如何转换为`STRING`。确保此选项与 `oblogproxy` 的时区设置相同。 -rootserver-list | 是 | 无 | String | OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,多个服务器地址使用英文分号 `;` 隔开。 -logproxy.host | 是 | 无 | String | oblogproxy 的 IP 地址或主机名。 -logproxy.port | 是 | 无 | Integer | oblogproxy 的端口号。 +server-time-zone | 否 | +00:00 | String | 数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。
合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,
如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。 +logproxy.host | 是 | 无 | String | OceanBase 日志代理服务 的 IP 地址或主机名。 +logproxy.port | 是 | 无 | Integer | OceanBase 日志代理服务 的端口号。 +logproxy.client.id | 否 | 按规则生成 | String | OceanBase日志代理服务的客户端链接,默认值的生成规则是 {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}。 +rootserver-list | 是 | 无 | String | OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,
多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。 +config-url | 否 | 无 | String | 从配置服务器获取服务器信息的 url, OceanBase 企业版本必填。 +working-mode | 否 | storage | String | 日志代理中 `obcdc` 的工作模式 , 可以是 `storage` 或 `memory`。 ## 支持的元数据 @@ -162,69 +178,101 @@ OceanBase 数据库是一个分布式数据库,它的日志也分散在不同 ### 消费提交日志 -OceanBase CDC 连接器使用 [oblogclient](https://github.com/oceanbase/oblogclient) 消费 oblogproxy 中的事务日志。 +OceanBase CDC 连接器使用 [oblogclient](https://github.com/oceanbase/oblogclient) 消费 OceanBase日志代理服务 中的事务日志。 ### DataStream Source OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以按照如下创建一个 SourceFunction: ```java +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; -import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory; +import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema; import com.ververica.cdc.connectors.oceanbase.table.StartupMode; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -public class OceanBaseSourceExample { +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; - public static void main(String[] args) throws Exception { - SourceFunction oceanBaseSource = - OceanBaseSource.builder() - .rsList("127.0.0.1:2882:2881") // 设置 rs 列表 - .startupMode(StartupMode.INITIAL) // 设置 startup 模式 - .username("user@test_tenant") // 设置集群用户名 - .password("pswd") // 设置集群密码 - .tenantName("test_tenant") // 设置捕获租户名,不支持正则表达式 - .databaseName("test_db") // 设置捕获数据库,支持正则表达式 - .tableName("test_table") // 设置捕获表,支持正则表达式 - .hostname("127.0.0.1") // 设置 OceanBase 服务器或代理的 hostname - .port(2881) // 设置 OceanBase 服务器或代理的 SQL 端口 - .logProxyHost("127.0.0.1") // 设置 log proxy 的 hostname - .logProxyPort(2983) // 设置 log proxy 的商品 - .deserializer(new JsonDebeziumDeserializationSchema()) // 把 SourceRecord 转化成 JSON 字符串 - .build(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // enable checkpoint - env.enableCheckpointing(3000); - - env.addSource(oceanBaseSource).print().setParallelism(1); - - env.execute("Print OceanBase Snapshot + Commit Log"); - } +public class OceanBaseSourceExample { + public static void main(String[] args) throws Exception { + ResolvedSchema resolvedSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("name", DataTypes.STRING().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + + RowType physicalDataType = + (RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType(); + TypeInformation resultTypeInfo = InternalTypeInfo.of(physicalDataType); + String serverTimeZone = "+00:00"; + + OceanBaseDeserializationSchema deserializer = + RowDataOceanBaseDeserializationSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setResultTypeInfo(resultTypeInfo) + .setServerTimeZone(ZoneId.of(serverTimeZone)) + .build(); + + SourceFunction oceanBaseSource = + OceanBaseSource.builder() + .rsList("127.0.0.1:2882:2881") + .startupMode(StartupMode.INITIAL) + .username("user@test_tenant") + .password("pswd") + .tenantName("test_tenant") + .databaseName("test_db") + .tableName("test_table") + .hostname("127.0.0.1") + .port(2881) + .logProxyHost("127.0.0.1") + .logProxyPort(2983) + .serverTimeZone(serverTimezone) + .deserializer(deserializer) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + env.addSource(oceanBaseSource).print().setParallelism(1); + env.execute("Print OceanBase Snapshot + Change Events"); + } } ``` ## 数据类型映射 -当启动模式不是 `INITIAL` 时,连接器无法获得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的FLink 类型。例如,`BOOLEAN`、`TINYINT(1)` 或 `BIT(1)` 均可以转换成 `BOOLEAN`。在 OceanBase 数据库中,`BOOLEAN` 等同于 `TINYINT(1)`,所以 `BOOLEAN` 和 `TINYINT` 类型的列在 Flink 中会被映射为 `TINYINT`,而 `BIT(1)` 在 Flink 中会被映射为 `BINARY(1)`。 - OceanBase 数据类型 | Flink SQL 类型 | 描述 ----- | ----- | ------ -BOOLEAN
TINYINT | TINYINT | -SMALLINT
TINYINT
UNSIGNED | SMALLINT | -INT
MEDIUMINT
SMALLINT
UNSIGNED | INT | -BIGINT
INT UNSIGNED | BIGINT | +BOOLEAN
TINYINT(1)
BIT(1)    | BOOLEAN | +TINYINT | TINYINT | +SMALLINT
TINYINT UNSIGNED | SMALLINT | +INT
MEDIUMINT
SMALLINT UNSIGNED | INT | +BIGINT
INT UNSIGNED | BIGINT | BIGINT UNSIGNED | DECIMAL(20, 0) | -REAL FLOAT | FLOAT | +REAL
FLOAT | FLOAT | DOUBLE | DOUBLE | NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38 | DECIMAL(p, s) | -NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <=65 | STRING | DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。但在 Flink 中,DECIMAL 的最高精度为 38。因此,如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。 | +NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <=65 | STRING | DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。
但在 Flink 中,DECIMAL 的最高精度为 38。因此,
如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。 | DATE | DATE | TIME [(p)] | TIME [(p)] | -TIMESTAMP [(p)]
DATETIME [(p)] | TIMESTAMP [(p)] | +DATETIME [(p)] | TIMESTAMP [(p)] | +TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | CHAR(n) | CHAR(n) | VARCHAR(n) | VARCHAR(n) | BIT(n) | BINARY(⌈n/8⌉) | @@ -234,4 +282,5 @@ TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT | STRING | TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB | BYTES | YEAR | INT | ENUM | STRING | -SET | STRING +SET | ARRAY<STRING> | 因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,
所以映射到 Flink 时是一个字符串数组 +JSON | STRING | JSON 类型的数据在 Flink 中会转化为 JSON 格式的字符串 diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index 9f2b937fc..12af31ac2 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -53,7 +53,7 @@ Setup OceanBase and LogProxy Server mysql> show parameters like 'obconfig_url'; ``` -6. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#quick-start). +5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#quick-start). How to create a OceanBase CDC table ---------------- @@ -93,6 +93,8 @@ Flink SQL> CREATE TABLE orders ( Flink SQL> SELECT * FROM orders; ``` +You can also try the quickstart tutorial that sync data from OceanBase to Elasticsearch, please refer [Flink CDC Tutorial](https://ververica.github.io/flink-cdc-connectors/release-2.3//content/quickstart/oceanbase-tutorial.html) for more information. + Connector Options ----------------