[oceanbase] support libobcdc 4.x and fix restore timestamp config (#2161)

pull/2099/head
He Wang 2 years ago committed by GitHub
parent 8cef4af544
commit c2a6456ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,7 +11,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto
|------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
| [mongodb-cdc](docs/content/connectors/mongodb-cdc.md) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
| [mysql-cdc](docs/content/connectors/mysql-cdc.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x | JDBC Driver: 5.1.4x |
| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x |
| [oracle-cdc](docs/content/connectors/oracle-cdc.md) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 |
| [postgres-cdc](docs/content/connectors/postgres-cdc.md) | <li> [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.27 |
| [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) | <li> [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |

@ -11,7 +11,7 @@ The CDC Connectors for Apache Flink<sup>®</sup> integrate Debezium as the engin
|----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
| [mongodb-cdc](connectors/mongodb-cdc.md) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
| [mysql-cdc](connectors/mysql-cdc.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
| [oceanbase-cdc](connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x | JDBC Driver: 5.1.4x |
| [oceanbase-cdc](connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x |
| [oracle-cdc](connectors/oracle-cdc.md) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 |
| [postgres-cdc](connectors/postgres-cdc.md) | <li> [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
| [sqlserver-cdc](connectors/sqlserver-cdc.md) | <li> [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |

@ -25,8 +25,8 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
### 配置 OceanBase 数据库和 oblogproxy 服务
1. 按照 [部署文档](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/deploy-the-distributed-oceanbase-cluster) 配置 OceanBase 集群。
2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。更多信息,参考 [用户管理文档](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/create-user-3)。
1. 按照 [文档](https://github.com/oceanbase/oceanbase#quick-start) 配置 OceanBase 集群。
2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。
```bash
mysql -h${host} -P${port} -uroot
@ -47,7 +47,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
mysql> show parameters like 'obconfig_url';
```
5. 按照 [快速入门 文档](https://github.com/oceanbase/oblogproxy#quick-start) 配置 oblogproxy。
5. 按照 [文档](https://github.com/oceanbase/oblogproxy#getting-started) 配置 oblogproxy。
## 创建 OceanBase CDC 表
@ -72,8 +72,8 @@ Flink SQL> CREATE TABLE orders (
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
@ -96,39 +96,197 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
1. 使用 `database-name``table-name` 匹配正则表达式中的数据库和表名。 由于`obcdc`(以前的`liboblog`)现在只支持`fnmatch`匹配,我们不能直接使用正则过滤 changelog 事件,所以通过两个选项去匹配去指定监听表只能在`initial`启动模式下使用。
2. 使用 `table-list` 去匹配数据库名和表名的准确列表。
配置项 | 是否必选 | 默认值 | 类型 | 描述
---- | ----- | ------ | ----- | ----
connector | 是 | 无 | String | 指定要使用的连接器。此处为 `oceanbase-cdc`
scan.startup.mode | 是 | 无 | String | 指定 OceanBase CDC 消费者的启动模式。可取值为 `initial`、`latest-offset` 或`timestamp`。
scan.startup.timestamp | 否 | 无 | Long | 起始点的时间戳,单位为秒。仅在 `scan.startup.mode` 的值为 `timestamp` 时适用。
username | 是 | 无 | String | 连接 OceanBase 数据库的用户的名称。
password | 是 | 无 | String | 连接 OceanBase 数据库时使用的密码。
tenant-name | 是 | 无 | String | 待监控 OceanBase 数据库的租户名,应该填入精确值。
database-name | 是 | 无 | String | 待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。
table-name | 否 | 无 | String | 待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。
table-list | 否 | 无 | String | 待监控 OceanBase 数据库的全路径的表名列表,逗号分隔,如:"db1.table1, db2.table2"。
hostname | 否 | 无 | String | OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。
port | 否 | 无 | Integer | OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 2881<br>或 OceanBase代理服务的端口号默认值为 2883
connect.timeout | 否 | 30s | Duration | 连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。
server-time-zone | 否 | +00:00 | String | 数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。<br>合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,<br>如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。
logproxy.host | 是 | 无 | String | OceanBase 日志代理服务 的 IP 地址或主机名。
logproxy.port | 是 | 无 | Integer | OceanBase 日志代理服务 的端口号。
logproxy.client.id | 否 | 按规则生成 | String | OceanBase日志代理服务的客户端链接默认值的生成规则是 {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}。
rootserver-list | 是 | 无 | String | OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`<br>多个服务器地址使用英文分号 `;` 隔开OceanBase 社区版本必填。
config-url | 否 | 无 | String | 从配置服务器获取服务器信息的 url, OceanBase 企业版本必填。
working-mode | 否 | storage | String | 日志代理中 `obcdc` 的工作模式 , 可以是 `storage``memory`
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 10%">配置项</th>
<th class="text-left" style="width: 8%">是否必选</th>
<th class="text-left" style="width: 7%">默认值</th>
<th class="text-left" style="width: 10%">类型</th>
<th class="text-left" style="width: 65%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>connector</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>指定要使用的连接器,此处为 <code>'oceanbase-cdc'</code></td>
</tr>
<tr>
<td>scan.startup.mode</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>指定 OceanBase CDC 消费者的启动模式。可取值为<code>'initial'</code>,<code>'latest-offset'</code> or
<code>'timestamp'</code></td>
</tr>
<tr>
<td>scan.startup.timestamp</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>Long</td>
<td>起始点的时间戳,单位为秒。仅在启动模式为 <code>'timestamp'</code> 时可用。</td>
</tr>
<tr>
<td>username</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>连接 OceanBase 数据库的用户的名称。</td>
</tr>
<tr>
<td>password</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>连接 OceanBase 数据库时使用的密码。</td>
</tr>
<tr>
<td>tenant-name</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>待监控 OceanBase 数据库的租户名,应该填入精确值。</td>
</tr>
<tr>
<td>database-name</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。</td>
</tr>
<tr>
<td>table-name</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。</td>
</tr>
<tr>
<td>table-list</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>待监控 OceanBase 数据库的全路径的表名列表,逗号分隔,如:"db1.table1, db2.table2"。</td>
</tr>
<tr>
<td>hostname</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。</td>
</tr>
<tr>
<td>port</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>Integer</td>
<td>
OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 2881<br>
或 OceanBase代理服务的端口号默认值为 2883</td>
</tr>
<tr>
<td>connect.timeout</td>
<td></td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。</td>
</tr>
<tr>
<td>server-time-zone</td>
<td></td>
<td style="word-wrap: break-word;">+00:00</td>
<td>String</td>
<td>
数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。<br>
合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,<br>
如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。
</td>
</tr>
<tr>
<td>logproxy.host</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>OceanBase 日志代理服务 的 IP 地址或主机名。</td>
</tr>
<tr>
<td>logproxy.port</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>Integer</td>
<td>OceanBase 日志代理服务 的端口号。</td>
</tr>
<tr>
<td>logproxy.client.id</td>
<td></td>
<td style="word-wrap: break-word;">规则生成</td>
<td>String</td>
<td>OceanBase日志代理服务的客户端连接 ID默认值的生成规则是 {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}。</td>
</tr>
<tr>
<td>rootserver-list</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`<br>多个服务器地址使用英文分号 `;` 隔开OceanBase 社区版本必填。</td>
</tr>
<tr>
<td>config-url</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>从配置服务器获取服务器信息的 url, OceanBase 企业版本必填。</td>
</tr>
<tr>
<td>working-mode</td>
<td></td>
<td style="word-wrap: break-word;">storage</td>
<td>String</td>
<td>日志代理中 `libobcdc` 的工作模式 , 可以是 `storage``memory`</td>
</tr>
</tbody>
</table>
</div>
## 支持的元数据
在创建表时您可以使用以下格式的元数据作为只读列VIRTUAL
列名 | 数据类型 | 描述
----- | ----- | -----
tenant_name | STRING NOT NULL | 当前记录所属的租户名称。
database_name | STRING NOT NULL | 当前记录所属的数据库名称。
table_name | STRING NOT NULL | 当前记录所属的表名称。
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该值表示此修改在数据库中发生的时间。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 15%">列名</th>
<th class="text-left" style="width: 30%">数据类型</th>
<th class="text-left" style="width: 55%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>tenant_name</td>
<td>STRING NOT NULL</td>
<td>当前记录所属的租户名称。</td>
</tr>
<tr>
<td>database_name</td>
<td>STRING NOT NULL</td>
<td>当前记录所属的库名。</td>
</tr>
<tr>
<td>table_name</td>
<td>STRING NOT NULL</td>
<td>当前记录所属的表名称。</td>
</tr>
<tr>
<td>op_ts</td>
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>该值表示此修改在数据库中发生的时间。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。</td>
</tr>
</tbody>
</table>
如下 SQL 展示了如何在表中使用这些元数据列:
@ -151,8 +309,8 @@ CREATE TABLE products (
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
@ -234,13 +392,13 @@ public class OceanBaseSourceExample {
.username("user@test_tenant")
.password("pswd")
.tenantName("test_tenant")
.databaseName("test_db")
.tableName("test_table")
.databaseName("^test_db$")
.tableName("^test_table$")
.hostname("127.0.0.1")
.port(2881)
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimezone)
.serverTimeZone(serverTimeZone)
.deserializer(deserializer)
.build();
@ -257,30 +415,185 @@ public class OceanBaseSourceExample {
## 数据类型映射
OceanBase 数据类型 | Flink SQL 类型 | 描述
----- | ----- | ------
BOOLEAN <br>TINYINT(1) <br>BIT(1)&nbsp;&nbsp;&nbsp; | BOOLEAN |
TINYINT | TINYINT |
SMALLINT <br>TINYINT UNSIGNED | SMALLINT |
INT <br>MEDIUMINT <br>SMALLINT UNSIGNED | INT |
BIGINT <br>INT UNSIGNED | BIGINT |
BIGINT UNSIGNED | DECIMAL(20, 0) |
REAL <br>FLOAT | FLOAT |
DOUBLE | DOUBLE |
NUMERIC(p, s)<br>DECIMAL(p, s)<br>where p <= 38 | DECIMAL(p, s) |
NUMERIC(p, s)<br>DECIMAL(p, s)<br>where 38 < p <=65 | STRING | DECIMAL NUMERIC OceanBase DECIMAL 65<br> 但在 Flink 中DECIMAL 的最高精度为 38。因此<br> 如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING以避免精度损失。 |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈n/8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
TINYTEXT<br>TEXT<br>MEDIUMTEXT<br>LONGTEXT | STRING |
TINYBLOB<br>BLOB<br>MEDIUMBLOB<br>LONGBLOB | BYTES |
YEAR | INT |
ENUM | STRING |
SET | ARRAY&lt;STRING&gt; | 因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,<br> 所以映射到 Flink 时是一个字符串数组
JSON | STRING | JSON 类型的数据在 Flink 中会转化为 JSON 格式的字符串
### Mysql 模式
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">OceanBase 数据类型</th>
<th class="text-left">Flink SQL 类型</th>
<th class="text-left">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>
BOOLEAN<br>
TINYINT(1)<br>
BIT(1)
</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
<td></td>
</tr>
<tr>
<td>
SMALLINT<br>
TINYINT UNSIGNED
</td>
<td>SMALLINT</td>
<td></td>
</tr>
<tr>
<td>
INT<br>
MEDIUMINT<br>
SMALLINT UNSIGNED
</td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>
BIGINT<br>
INT UNSIGNED
</td>
<td>BIGINT</td>
<td></td>
</tr>
<tr>
<td>BIGINT UNSIGNED</td>
<td>DECIMAL(20, 0)</td>
<td></td>
</tr>
<tr>
<td>
REAL<br>
FLOAT
</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>
DOUBLE
</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
DECIMAL(p, s)<br>
where p <= 38 </td>
<td>DECIMAL(p, s)</td>
<td></td>
</tr>
<tr>
<td>
NUMERIC(p, s)<br>
DECIMAL(p, s)<br>
where 38 < p <=65 </td>
<td>STRING</td>
<td>
DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中DECIMAL 数据类型的精度最高为 65。<br>
但在 Flink 中DECIMAL 的最高精度为 38。因此<br>
如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING以避免精度损失。
</td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIME [(p)]</td>
<td>TIME [(p)]</td>
<td></td>
</tr>
<tr>
<td>DATETIME [(p)]</td>
<td>TIMESTAMP [(p)]</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP [(p)]</td>
<td>TIMESTAMP_LTZ [(p)]</td>
<td></td>
</tr>
<tr>
<td>CHAR(n)</td>
<td>CHAR(n)</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
<tr>
<td>BIT(n)</td>
<td>BINARY(⌈n/8⌉)</td>
<td></td>
</tr>
<tr>
<td>BINARY(n)</td>
<td>BINARY(n)</td>
<td></td>
</tr>
<tr>
<td>VARBINARY(N)</td>
<td>VARBINARY(N)</td>
<td></td>
</tr>
<tr>
<td>
TINYTEXT<br>
TEXT<br>
MEDIUMTEXT<br>
LONGTEXT
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
TINYBLOB<br>
BLOB<br>
MEDIUMBLOB<br>
LONGBLOB
</td>
<td>BYTES</td>
<td></td>
</tr>
<tr>
<td>YEAR</td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>ENUM</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>SET</td>
<td>ARRAY&lt;STRING&gt;</td>
<td>
因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,<br>
所以映射到 Flink 时是一个字符串数组
</td>
</tr>
<tr>
<td>JSON</td>
<td>STRING</td>
<td>JSON 类型的数据在 Flink 中会转化为 JSON 格式的字符串</td>
</tr>
</tbody>
</table>
</div>

@ -1,11 +1,11 @@
# OceanBase CDC Connector
The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to setup the OceanBase CDC connector to run SQL queries against OceanBase.
The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to set up the OceanBase CDC connector to run SQL queries against OceanBase.
Dependencies
------------
In order to setup the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
In order to set up the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
```xml
<dependency>
@ -27,9 +27,9 @@ Download [flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.mave
Setup OceanBase and LogProxy Server
----------------------
1. Setup the OceanBase cluster following the [deployment doc](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/deploy-the-distributed-oceanbase-cluster).
1. Set up the OceanBase cluster following the [doc](https://github.com/oceanbase/oceanbase#quick-start).
2. Create a user with password in `sys` tenant, this user is used in OceanBase LogProxy. See [user management doc](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/create-user-3).
2. Create a user with password in `sys` tenant, this user is used in OceanBase LogProxy.
```shell
mysql -h${host} -P${port} -uroot
@ -53,7 +53,7 @@ Setup OceanBase and LogProxy Server
mysql> show parameters like 'obconfig_url';
```
5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#quick-start).
5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#getting-started).
How to create a OceanBase CDC table
----------------
@ -79,8 +79,8 @@ Flink SQL> CREATE TABLE orders (
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
@ -314,8 +314,8 @@ CREATE TABLE products (
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
@ -400,13 +400,13 @@ public class OceanBaseSourceExample {
.username("user@test_tenant")
.password("pswd")
.tenantName("test_tenant")
.databaseName("test_db")
.tableName("test_table")
.databaseName("^test_db$")
.tableName("^test_table$")
.hostname("127.0.0.1")
.port(2881)
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimezone)
.serverTimeZone(serverTimeZone)
.deserializer(deserializer)
.build();
@ -423,6 +423,8 @@ public class OceanBaseSourceExample {
Data Type Mapping
----------------
### Mysql Mode
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>

@ -43,15 +43,9 @@ under the License.
<!-- OceanBase Log Client -->
<dependency>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logproxy-client</artifactId>
<groupId>com.oceanbase</groupId>
<artifactId>oblogclient-logproxy</artifactId>
<version>${oblogclient.version}</version>
<exclusions>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- The MySQL JDBC driver for reading snapshot-->

@ -183,6 +183,12 @@ public class OceanBaseSource {
startupMode + " mode is not supported.");
}
if (!startupMode.equals(StartupMode.INITIAL)
&& (StringUtils.isNotEmpty(databaseName)
|| StringUtils.isNotEmpty(tableName))) {
throw new IllegalArgumentException(
"If startup mode is not 'INITIAL', 'database-name' and 'table-name' must not be configured");
}
if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(

@ -35,7 +35,6 @@ import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.LogMessage;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord;
@ -46,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -56,7 +56,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -90,10 +89,11 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
private final OceanBaseDeserializationSchema<T> deserializer;
private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false);
private final List<LogMessage> logMessageBuffer = new LinkedList<>();
private final List<OceanBaseRecord> changeRecordBuffer = new LinkedList<>();
private transient Set<String> tableSet;
private transient volatile long resolvedTimestamp;
private transient volatile long startTimestamp;
private transient volatile OceanBaseConnection snapshotConnection;
private transient LogProxyClient logProxyClient;
private transient ListState<Long> offsetState;
@ -136,7 +136,6 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
public void open(final Configuration config) throws Exception {
super.open(config);
this.outputCollector = new OutputCollector<>();
this.resolvedTimestamp = -1;
}
@Override
@ -150,23 +149,18 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
readChangeRecords();
if (shouldReadSnapshot()) {
synchronized (ctx.getCheckpointLock()) {
try {
readSnapshotRecords();
} finally {
closeSnapshotConnection();
}
LOG.info("Snapshot reading finished");
}
LOG.info("Snapshot reading started");
readSnapshotRecords();
LOG.info("Snapshot reading finished");
} else {
LOG.info("Skip snapshot reading");
LOG.info("Snapshot reading skipped");
}
logProxyClient.join();
}
private boolean shouldReadSnapshot() {
return resolvedTimestamp == -1 && snapshot;
return resolvedTimestamp <= 0 && snapshot;
}
private OceanBaseConnection getSnapshotConnection() {
@ -210,27 +204,32 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
}
}
if (shouldReadSnapshot()
&& StringUtils.isNotBlank(databaseName)
&& StringUtils.isNotBlank(tableName)) {
if (StringUtils.isNotBlank(databaseName) && StringUtils.isNotBlank(tableName)) {
try {
String sql =
String.format(
"SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+ "WHERE TABLE_TYPE='BASE TABLE' and TABLE_SCHEMA REGEXP '%s' and TABLE_NAME REGEXP '%s'",
databaseName, tableName);
final List<String> matchedTables = new ArrayList<>();
getSnapshotConnection()
.query(
sql,
rs -> {
while (rs.next()) {
localTableSet.add(
matchedTables.add(
String.format(
"%s.%s", rs.getString(1), rs.getString(2)));
}
});
LOG.info("Pattern matched tables: {}", matchedTables);
localTableSet.addAll(matchedTables);
} catch (SQLException e) {
LOG.error("Query database and table name failed", e);
LOG.error(
String.format(
"Query table list by 'database-name' %s and 'table-name' %s failed.",
databaseName, tableName),
e);
throw new FlinkRuntimeException(e);
}
}
@ -239,11 +238,9 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
throw new FlinkRuntimeException("No valid table found");
}
LOG.info("Table list: {}", localTableSet);
this.tableSet = localTableSet;
this.obReaderConfig.setTableWhiteList(
localTableSet.stream()
.map(table -> String.format("%s.%s", tenantName, table))
.collect(Collectors.joining("|")));
this.obReaderConfig.setTableWhiteList(String.format("%s.*.*", tenantName));
}
protected void readSnapshotRecords() {
@ -253,6 +250,7 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
readSnapshotRecordsByTable(schema[0], schema[1]);
});
snapshotCompleted.set(true);
resolvedTimestamp = startTimestamp;
}
private void readSnapshotRecordsByTable(String databaseName, String tableName) {
@ -315,6 +313,7 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
case BEGIN:
if (!started) {
started = true;
startTimestamp = Long.parseLong(message.getSafeTimestamp());
latch.countDown();
}
break;
@ -324,22 +323,24 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
if (!started) {
break;
}
logMessageBuffer.add(message);
OceanBaseRecord record = getChangeRecord(message);
if (record != null) {
changeRecordBuffer.add(record);
}
break;
case COMMIT:
// flush buffer after snapshot completed
if (!shouldReadSnapshot() || snapshotCompleted.get()) {
logMessageBuffer.forEach(
msg -> {
changeRecordBuffer.forEach(
r -> {
try {
deserializer.deserialize(
getChangeRecord(msg), outputCollector);
deserializer.deserialize(r, outputCollector);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
});
logMessageBuffer.clear();
long timestamp = getCheckpointTimestamp(message);
changeRecordBuffer.clear();
long timestamp = Long.parseLong(message.getSafeTimestamp());
if (timestamp > resolvedTimestamp) {
resolvedTimestamp = timestamp;
}
@ -369,41 +370,23 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
if (!latch.await(connectTimeout.getSeconds(), TimeUnit.SECONDS)) {
throw new TimeoutException("Timeout to receive messages in RecordListener");
}
LOG.info("LogProxyClient packet processing started");
LOG.info("LogProxyClient packet processing started from timestamp {}", startTimestamp);
}
private OceanBaseRecord getChangeRecord(LogMessage message) {
String databaseName = message.getDbName().replace(tenantName + ".", "");
if (!tableSet.contains(String.format("%s.%s", databaseName, message.getTableName()))) {
return null;
}
OceanBaseRecord.SourceInfo sourceInfo =
new OceanBaseRecord.SourceInfo(
tenantName,
databaseName,
message.getTableName(),
getCheckpointTimestamp(message));
Long.parseLong(message.getSafeTimestamp()));
return new OceanBaseRecord(sourceInfo, message.getOpt(), message.getFieldList());
}
/**
* Get log message checkpoint timestamp in seconds. Refer to 'globalSafeTimestamp' in {@link
* LogMessage}.
*
* @param message Log message.
* @return Timestamp in seconds.
*/
private long getCheckpointTimestamp(LogMessage message) {
long timestamp = -1;
try {
if (DataMessage.Record.Type.HEARTBEAT.equals(message.getOpt())) {
timestamp = Long.parseLong(message.getTimestamp());
} else {
timestamp = message.getFileNameOffset();
}
} catch (Throwable t) {
LOG.error("Failed to get checkpoint from log message", t);
}
return timestamp;
}
@Override
public void notifyCheckpointComplete(long l) {
// do nothing

@ -56,8 +56,8 @@ under the License.
<include>com.ververica:flink-connector-debezium</include>
<include>com.ververica:flink-connector-oceanbase-cdc</include>
<include>mysql:mysql-connector-java</include>
<include>com.oceanbase.logclient:*</include>
<include>io.netty:netty-all</include>
<include>com.oceanbase:*</include>
<include>io.netty:*</include>
<include>com.google.protobuf:protobuf-java</include>
<include>commons-codec:commons-codec</include>
<include>org.lz4:lz4-java</include>

@ -91,7 +91,7 @@ under the License.
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version>
<oblogclient.version>1.0.6</oblogclient.version>
<oblogclient.version>1.1.0</oblogclient.version>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<flink.forkCount>1</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>

Loading…
Cancel
Save