Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'oceanbase-cdc' . |
scan.startup.mode | required | (none) | String | Specify the startup mode for OceanBase CDC consumer, valid enumerations are
'initial' ,'latest-offset' or 'timestamp' .
|
scan.startup.timestamp | optional | (none) | Long | Timestamp in seconds of the start point, only used for 'timestamp' startup mode. |
username | required | (none) | String | Username to be used when connecting to OceanBase. |
password | required | (none) | String | Password to be used when connecting to OceanBase. |
tenant-name | required | (none) | String | Tenant name of OceanBase to monitor, should be exact value. |
database-name | optional | (none) | String | Database name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode. |
table-name | optional | (none) | String | Table name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode. |
table-list | optional | (none) | String | List of full names of tables, separated by commas, e.g. "db1.table1, db2.table2". |
hostname | optional | (none) | String | IP address or hostname of the OceanBase database server or OceanBase Proxy server. |
port | optional | (none) | Integer | Integer port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default. |
connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out. |
server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in OceanBase converted to STRING in snapshot reading, please make sure to set it same with the timezone of `oblogproxy` deployment. |
logproxy.host | required | (none) | String | Hostname or IP address of OceanBase log proxy service. |
logproxy.port | required | (none) | Integer | Port number of OceanBase log proxy service. |
logproxy.client.id | optional | By rule. | String | Id of a log proxy client connection, will be in format {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant} by default. |
rootserver-list | optional | (none) | String | The semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`, required for OceanBase CE. |
config-url | optional | (none) | String | The url to get the server info from the config server, required for OceanBase EE. |
working-mode | optional | storage | String | Working mode of `obcdc` in LogProxy, can be `storage` or `memory`. |
Key | DataType | Description |
---|---|---|
tenant_name | STRING NOT NULL | Name of the tenant that contains the row. |
database_name | STRING NOT NULL | Name of the database that contains the row. |
table_name | STRING NOT NULL | Name of the table that contains the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
OceanBase type | Flink SQL type | NOTE |
---|---|---|
BOOLEAN TINYINT |
TINYINT | |
SMALLINT TINYINT UNSIGNED |
SMALLINT | |
INT MEDIUMINT SMALLINT UNSIGNED |
INT | |
BIGINT INT UNSIGNED |
BIGINT | |
BIGINT UNSIGNED | DECIMAL(20, 0) | |
REAL FLOAT |
FLOAT | |
DOUBLE | DOUBLE | |
NUMERIC(p, s) DECIMAL(p, s) where p <= 38 |
DECIMAL(p, s) | |
NUMERIC(p, s) DECIMAL(p, s) where 38 < p <=65 |
STRING | DECIMAL is equivalent to NUMERIC. The precision for DECIMAL data type is up to 65 in OceanBase, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss. |
DATE | DATE | |
TIME [(p)] | TIME [(p)] | |
TIMESTAMP [(p)] DATETIME [(p)] |
TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) | |
BIT(n) | BINARY(⌈n/8⌉) | |
BINARY(n) | BINARY(n) | |
VARBINARY(N) | VARBINARY(N) | |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING | |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | |
YEAR | INT | |
ENUM | STRING | |
SET | STRING |