@@ -187,11 +266,12 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
scan.startup.mode |
- 是 |
- 无 |
+ 否 |
+ initial |
String |
- 指定 OceanBase CDC 消费者的启动模式。可取值为'initial' ,'latest-offset' or
- 'timestamp' 。 |
+ 指定 OceanBase CDC 消费者的启动模式。可取值为
+ 'initial' ,'latest-offset' ,'timestamp' 或 'snapshot' 。
+ |
scan.startup.timestamp |
@@ -216,7 +296,7 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
tenant-name |
- 是 |
+ 否 |
无 |
String |
待监控 OceanBase 数据库的租户名,应该填入精确值。 |
@@ -226,14 +306,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
否 |
无 |
String |
- 待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。 |
+ 待监控 OceanBase 数据库的数据库名,应该是正则表达式。 |
table-name |
否 |
无 |
String |
- 待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。 |
+ 待监控 OceanBase 数据库的表名,应该是正则表达式。 |
table-list |
@@ -244,14 +324,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
hostname |
- 否 |
+ 是 |
无 |
String |
OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。 |
port |
- 否 |
+ 是 |
无 |
Integer |
@@ -278,14 +358,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
|
logproxy.host |
- 是 |
+ 否 |
无 |
String |
OceanBase 日志代理服务 的 IP 地址或主机名。 |
logproxy.port |
- 是 |
+ 否 |
无 |
Integer |
OceanBase 日志代理服务 的端口号。 |
@@ -328,7 +408,7 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
jdbc.driver |
否 |
- com.mysql.jdbc.Driver |
+ com.mysql.cj.jdbc.Driver |
String |
全量读取时使用的 jdbc 驱动类名。 |
@@ -339,11 +419,19 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
String |
传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'。 |
+
+ obcdc.properties.* |
+ 否 |
+ 无 |
+ String |
+ 传递自定义 libobcdc 属性的选项,如 'obcdc.properties.sort_trans_participants' = '1'。详情参见 obcdc 配置项说明。 |
+
-## 支持的元数据
+支持的元数据
+----------------
在创建表时,您可以使用以下格式的元数据作为只读列(VIRTUAL)。
@@ -358,13 +446,18 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
tenant_name |
- STRING NOT NULL |
+ STRING |
当前记录所属的租户名称。 |
database_name |
- STRING NOT NULL |
- 当前记录所属的库名。 |
+ STRING |
+ 当前记录所属的 db 名。 |
+
+
+ schema_name |
+ STRING |
+ 当前记录所属的 schema 名。 |
table_name |
@@ -406,10 +499,13 @@ CREATE TABLE products (
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
- 'logproxy.port' = '2983');
+ 'logproxy.port' = '2983',
+ 'working-mode' = 'memory'
+);
```
-## 特性
+特性
+--------
### At-Least-Once 处理
@@ -424,6 +520,7 @@ OceanBase 数据库是一个分布式数据库,它的日志也分散在不同
- `initial`(默认):在首次启动时对受监视的数据库表执行初始快照,并继续读取最新的提交日志。
- `latest-offset`:首次启动时,不对受监视的数据库表执行快照,仅从连接器启动时读取提交日志。
- `timestamp`:在首次启动时不对受监视的数据库表执行初始快照,仅从指定的 `scan.startup.timestamp` 读取最新的提交日志。
+- `snapshot`: 仅对受监视的数据库表执行初始快照。
### 消费提交日志
@@ -434,65 +531,31 @@ OceanBase CDC 连接器使用 [oblogclient](https://github.com/oceanbase/oblogcl
OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以按照如下创建一个 SourceFunction:
```java
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
-import org.apache.flink.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
-import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
-import org.apache.flink.cdc.connectors.oceanbase.table.StartupMode;
-
-import java.time.ZoneId;
-import java.util.Arrays;
-import java.util.Collections;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
- ResolvedSchema resolvedSchema =
- new ResolvedSchema(
- Arrays.asList(
- Column.physical("id", DataTypes.INT().notNull()),
- Column.physical("name", DataTypes.STRING().notNull())),
- Collections.emptyList(),
- UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
-
- RowType physicalDataType =
- (RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType();
- TypeInformation resultTypeInfo = InternalTypeInfo.of(physicalDataType);
- String serverTimeZone = "+00:00";
-
- OceanBaseDeserializationSchema deserializer =
- RowDataOceanBaseDeserializationSchema.newBuilder()
- .setPhysicalRowType(physicalDataType)
- .setResultTypeInfo(resultTypeInfo)
- .setServerTimeZone(ZoneId.of(serverTimeZone))
- .build();
-
- SourceFunction oceanBaseSource =
- OceanBaseSource.builder()
- .rsList("127.0.0.1:2882:2881")
- .startupMode(StartupMode.INITIAL)
+ SourceFunction oceanBaseSource =
+ OceanBaseSource.builder()
+ .startupOptions(StartupOptions.initial())
+ .hostname("127.0.0.1")
+ .port(2881)
.username("user@test_tenant")
.password("pswd")
+ .compatibleMode("mysql")
+ .jdbcDriver("com.mysql.cj.jdbc.Driver")
.tenantName("test_tenant")
.databaseName("^test_db$")
.tableName("^test_table$")
- .hostname("127.0.0.1")
- .port(2881)
- .compatibleMode("mysql")
- .jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
- .serverTimeZone(serverTimeZone)
- .deserializer(deserializer)
+ .rsList("127.0.0.1:2882:2881")
+ .serverTimeZone("+08:00")
+ .deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -506,7 +569,8 @@ public class OceanBaseSourceExample {
}
```
-## 数据类型映射
+数据类型映射
+----------------
### Mysql 模式
@@ -591,7 +655,8 @@ public class OceanBaseSourceExample {
NUMERIC(p, s)
DECIMAL(p, s)
- where 38 < p <=65 |
+ where 38 < p <=65
+
STRING |
DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。
@@ -631,7 +696,7 @@ public class OceanBaseSourceExample {
|
BIT(n) |
- BINARY(⌈n/8⌉) |
+ BINARY(⌈(n + 7) / 8⌉) |
|
diff --git a/docs/content/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md b/docs/content/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md
index 94900e660..e0f25bbbe 100644
--- a/docs/content/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md
+++ b/docs/content/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md
@@ -28,22 +28,81 @@ under the License.
The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to set up the OceanBase CDC connector to run SQL queries against OceanBase.
+### OceanBase CDC Solutions
+
+Glossary:
+
+- *OceanBase CE*: OceanBase Community Edition. It's compatible with MySQL and has been open sourced at https://github.com/oceanbase/oceanbase.
+- *OceanBase EE*: OceanBase Enterprise Edition. It supports two compatibility modes: MySQL and Oracle. See https://en.oceanbase.com.
+- *OceanBase Cloud*: OceanBase Enterprise Edition on Cloud. See https://en.oceanbase.com/product/cloud.
+- *Log Proxy CE*: OceanBase Log Proxy Community Edition (CDC mode). It's a proxy service which can fetch the commit log data of OceanBase CE. It has been open sourced at https://github.com/oceanbase/oblogproxy.
+- *Log Proxy EE*: OceanBase Log Proxy Enterprise Edition (CDC mode). It's a proxy service which can fetch the commit log data of OceanBase EE. Limited support is available on OceanBase Cloud only, you can contact the provider support for more details.
+- *Binlog Service CE*: OceanBase Binlog Service Community Edition. It is a solution of OceanBase CE that is compatible with the MySQL replication protocol. See the docs of Log Proxy CE (Binlog mode) for details.
+- *Binlog Service EE*: OceanBase Binlog Service Enterprise Edition. It is a solution of OceanBase EE MySQL mode that is compatible with the MySQL replication protocol, and it's only available for users of Alibaba Cloud, see [User Guide](https://www.alibabacloud.com/help/en/apsaradb-for-oceanbase/latest/binlog-overview).
+- *MySQL Driver*: `mysql-connector-java` which can be used with OceanBase CE and OceanBase EE MySQL mode.
+- *OceanBase Driver*: The Jdbc driver for OceanBase, which supports both MySQL mode and Oracle mode of all OceanBase versions. It's open sourced at https://github.com/oceanbase/obconnector-j.
+
+CDC Source Solutions for OceanBase:
+
+
+
+
+
+ Database |
+ Supported Driver |
+ CDC Source Connector |
+ Other Required Components |
+
+
+
+
+ OceanBase CE |
+
+ MySQL Driver: 5.1.4x, 8.0.x
+ OceanBase Driver: 2.4.x
+ |
+ OceanBase CDC Connector |
+ Log Proxy CE |
+
+
+ MySQL Driver: 8.0.x |
+ MySQL CDC Connector |
+ Binlog Service CE |
+
+
+ OceanBase EE (MySQL Mode) |
+
+ MySQL Driver: 5.1.4x, 8.0.x
+ OceanBase Driver: 2.4.x
+ |
+ OceanBase CDC Connector |
+ Log Proxy EE |
+
+
+ MySQL Driver: 8.0.x |
+ MySQL CDC Connector |
+ Binlog Service EE |
+
+
+ OceanBase EE (Oracle Mode) |
+ OceanBase Driver: 2.4.x |
+ OceanBase CDC Connector |
+ Log Proxy EE (CDC Mode) |
+
+
+
+
+
+Note: For users of OceanBase CE or OceanBase EE MySQL Mode, we recommend that you follow the [MySQL CDC documentation](mysql-cdc.md) to use the MySQL CDC source connector with the Binlog service.
+
Dependencies
------------
In order to set up the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
-{{< artifact flink-connector-oceanbase-cdc >}}
-
-If you want to use OceanBase JDBC driver to connect to the enterprise edition database, you should also include the following dependency in your class path.
+### Maven dependency
-```xml
-
- com.oceanbase
- oceanbase-client
- 2.4.2
-
-```
+{{< artifact flink-connector-oceanbase-cdc >}}
### SQL Client JAR
@@ -53,7 +112,30 @@ Download [flink-sql-connector-oceanbase-cdc-3.0.1.jar](https://repo1.maven.org/m
**Note:** Refer to [flink-sql-connector-oceanbase-cdc](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc), more released versions will be available in the Maven central warehouse.
-Due to the license issue, we can not include the OceanBase JDBC driver in the cdc jar. If you need to use it, you can download it from [here](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar) and put it under `/lib/`, you also need to set the start option `jdbc.driver` to `com.oceanbase.jdbc.Driver`.
+Since the licenses of MySQL Driver and OceanBase Driver are incompatible with Flink CDC project, we can't provide them in prebuilt connector jar packages. You may need to configure the following dependencies manually.
+
+
Setup OceanBase and LogProxy Server
----------------------
@@ -84,7 +166,7 @@ Setup OceanBase and LogProxy Server
mysql> show parameters like 'obconfig_url';
```
-5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#getting-started).
+5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [docs (Chinese)](https://www.oceanbase.com/docs/community-oblogproxy-doc-1000000000531984).
How to create a OceanBase CDC table
----------------
@@ -162,7 +244,7 @@ Connector Options
The OceanBase CDC Connector contains some options for both sql and stream api as the following sheet.
*Note*: The connector supports two ways to specify the table list to listen to, and will get the union of the results when both way are used at the same time.
-1. Use `database-name` and `table-name` to match database and table names in regex. As the `obcdc` (former `liboblog`) only supports `fnmatch` now, we can't use regex directly to filter change events, so these two options can only be used in `initial` startup mode.
+1. Use `database-name` and `table-name` to match database and table names in regex.
2. Use `table-list` to match the exact value of database and table names.
@@ -186,11 +268,11 @@ The OceanBase CDC Connector contains some options for both sql and stream api as
scan.startup.mode |
- required |
- (none) |
+ optional |
+ initial |
String |
Specify the startup mode for OceanBase CDC consumer, valid enumerations are
- 'initial' ,'latest-offset' or 'timestamp' .
+ 'initial' ,'latest-offset' ,'timestamp' or 'snapshot' .
|
@@ -216,24 +298,24 @@ The OceanBase CDC Connector contains some options for both sql and stream api as
tenant-name |
- required |
+ optional |
(none) |
String |
- Tenant name of OceanBase to monitor, should be exact value. |
+ Tenant name of OceanBase to monitor, should be exact value. Required when 'scan.startup.mode' is not 'snapshot'. |
database-name |
optional |
(none) |
String |
- Database name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode. |
+ Database name of OceanBase to monitor, should be regular expression. |
table-name |
optional |
(none) |
String |
- Table name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode. |
+ Table name of OceanBase to monitor, should be regular expression. |
table-list |
@@ -244,14 +326,14 @@ The OceanBase CDC Connector contains some options for both sql and stream api as
hostname |
- optional |
+ required |
(none) |
String |
IP address or hostname of the OceanBase database server or OceanBase Proxy server. |
port |
- optional |
+ required |
(none) |
Integer |
Integer port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default. |
@@ -272,17 +354,17 @@ The OceanBase CDC Connector contains some options for both sql and stream api as
logproxy.host |
- required |
+ optional |
(none) |
String |
- Hostname or IP address of OceanBase log proxy service. |
+ Hostname or IP address of OceanBase log proxy service. Required when 'scan.startup.mode' is not 'snapshot'. |
logproxy.port |
- required |
+ optional |
(none) |
Integer |
- Port number of OceanBase log proxy service. |
+ Port number of OceanBase log proxy service. Required when 'scan.startup.mode' is not 'snapshot'. |
logproxy.client.id |
@@ -322,7 +404,7 @@ The OceanBase CDC Connector contains some options for both sql and stream api as
jdbc.driver |
optional |
- com.mysql.jdbc.Driver |
+ com.mysql.cj.jdbc.Driver |
String |
JDBC driver class for snapshot reading. |
@@ -360,14 +442,19 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a