[oceanbase] Add new deserialization schema with runtime converter (#1356)

This closes #980.
pull/1569/head
He Wang 2 years ago committed by GitHub
parent d3435381a6
commit b1b092c97e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -349,30 +349,63 @@ The OceanBase CDC Connector using [oblogclient](https://github.com/oceanbase/obl
The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
```java ```java
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory; import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode; import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OceanBaseSourceExample { import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
SourceFunction<String> oceanBaseSource = ResolvedSchema resolvedSchema =
OceanBaseSource.<String>builder() new ResolvedSchema(
.rsList("127.0.0.1:2882:2881") // set root server list Arrays.asList(
.startupMode(StartupMode.INITIAL) // set startup mode Column.physical("id", DataTypes.INT().notNull()),
.username("user@test_tenant") // set cluster username Column.physical("name", DataTypes.STRING().notNull())),
.password("pswd") // set cluster password Collections.emptyList(),
.tenantName("test_tenant") // set captured tenant name, do not support regex UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
.databaseName("test_db") // set captured database, support regex
.tableName("test_table") // set captured table, support regex RowType physicalDataType =
.hostname("127.0.0.1") // set hostname of OceanBase server or proxy (RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType();
.port(2881) // set the sql port for OceanBase server or proxy TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(physicalDataType);
.logProxyHost("127.0.0.1") // set the hostname of log proxy String serverTimeZone = "+00:00";
.logProxyPort(2983) // set the port of log proxy
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String OceanBaseDeserializationSchema<RowData> deserializer =
RowDataOceanBaseDeserializationSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setResultTypeInfo(resultTypeInfo)
.setServerTimeZone(ZoneId.of(serverTimeZone))
.build();
SourceFunction<RowData> oceanBaseSource =
OceanBaseSource.<RowData>builder()
.rsList("127.0.0.1:2882:2881")
.startupMode(StartupMode.INITIAL)
.username("user@test_tenant")
.password("pswd")
.tenantName("test_tenant")
.databaseName("test_db")
.tableName("test_table")
.hostname("127.0.0.1")
.port(2881)
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimezone)
.deserializer(deserializer)
.build(); .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@ -381,18 +414,13 @@ public class OceanBaseSourceExample {
env.enableCheckpointing(3000); env.enableCheckpointing(3000);
env.addSource(oceanBaseSource).print().setParallelism(1); env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Change Events");
env.execute("Print OceanBase Snapshot + Commit Log");
} }
} }
``` ```
Data Type Mapping Data Type Mapping
---------------- ----------------
When the startup mode is not `INITIAL`, we will not be able to get the precision and scale of a column. In order to be compatible with different startup modes, we will not map one OceanBase type of different precision to different FLink types.
For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) or BIT(1). BOOLEAN is equivalent to TINYINT(1) in OceanBase, so columns of BOOLEAN and TINYINT types will be mapped to TINYINT in Flink, and BIT(1) will be mapped to BINARY(1) in Flink.
<div class="wy-table-responsive"> <div class="wy-table-responsive">
<table class="colwidths-auto docutils"> <table class="colwidths-auto docutils">
<thead> <thead>
@ -405,7 +433,13 @@ For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) o
<tbody> <tbody>
<tr> <tr>
<td>BOOLEAN<br> <td>BOOLEAN<br>
TINYINT</td> TINYINT(1)<br>
BIT(1)</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>TINYINT</td>
<td>TINYINT</td> <td>TINYINT</td>
<td></td> <td></td>
</tr> </tr>
@ -483,11 +517,13 @@ For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) o
<td></td> <td></td>
</tr> </tr>
<tr> <tr>
<td>TIMESTAMP [(p)]<br> <td>DATETIME [(p)]</td>
DATETIME [(p)] <td>TIMESTAMP [(p)]</td>
</td> <td></td>
<td>TIMESTAMP [(p)] </tr>
</td> <tr>
<td>TIMESTAMP [(p)]</td>
<td>TIMESTAMP_LTZ [(p)]</td>
<td></td> <td></td>
</tr> </tr>
<tr> <tr>
@ -547,8 +583,13 @@ For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) o
</tr> </tr>
<tr> <tr>
<td>SET</td> <td>SET</td>
<td>ARRAY&lt;STRING&gt;</td>
<td>As the SET data type in OceanBase is a string object that can have zero or more values, it should always be mapped to an array of string</td>
</tr>
<tr>
<td>JSON</td>
<td>STRING</td> <td>STRING</td>
<td></td> <td>The JSON data type will be converted into STRING with JSON format in Flink.</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>

@ -15,13 +15,13 @@ Create `docker-compose.yml`.
version: '2.1' version: '2.1'
services: services:
observer: observer:
image: oceanbase/oceanbase-ce:3.1.3_bp1 image: oceanbase/oceanbase-ce:3.1.4
container_name: observer container_name: observer
environment: environment:
- 'OB_ROOT_PASSWORD=pswd' - 'OB_ROOT_PASSWORD=pswd'
network_mode: "host" network_mode: "host"
oblogproxy: oblogproxy:
image: whhe/oblogproxy:1.0.2 image: whhe/oblogproxy:1.0.3
container_name: oblogproxy container_name: oblogproxy
environment: environment:
- 'OB_SYS_USERNAME=root' - 'OB_SYS_USERNAME=root'
@ -130,7 +130,7 @@ Flink SQL> CREATE TABLE orders (
customer_name STRING, customer_name STRING,
price DECIMAL(10, 5), price DECIMAL(10, 5),
product_id INT, product_id INT,
order_status TINYINT, order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED PRIMARY KEY (order_id) NOT ENFORCED
) WITH ( ) WITH (
'connector' = 'oceanbase-cdc', 'connector' = 'oceanbase-cdc',
@ -177,7 +177,7 @@ Flink SQL> CREATE TABLE enriched_orders (
customer_name STRING, customer_name STRING,
price DECIMAL(10, 5), price DECIMAL(10, 5),
product_id INT, product_id INT,
order_status TINYINT, order_status BOOLEAN,
product_name STRING, product_name STRING,
product_description STRING, product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED PRIMARY KEY (order_id) NOT ENFORCED

@ -16,13 +16,13 @@
version: '2.1' version: '2.1'
services: services:
observer: observer:
image: oceanbase/oceanbase-ce:3.1.3_bp1 image: oceanbase/oceanbase-ce:3.1.4
container_name: observer container_name: observer
environment: environment:
- 'OB_ROOT_PASSWORD=pswd' - 'OB_ROOT_PASSWORD=pswd'
network_mode: "host" network_mode: "host"
oblogproxy: oblogproxy:
image: whhe/oblogproxy:1.0.2 image: whhe/oblogproxy:1.0.3
container_name: oblogproxy container_name: oblogproxy
environment: environment:
- 'OB_SYS_USERNAME=root' - 'OB_SYS_USERNAME=root'
@ -129,7 +129,7 @@ Flink SQL> CREATE TABLE orders (
customer_name STRING, customer_name STRING,
price DECIMAL(10, 5), price DECIMAL(10, 5),
product_id INT, product_id INT,
order_status TINYINT, order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED PRIMARY KEY (order_id) NOT ENFORCED
) WITH ( ) WITH (
'connector' = 'oceanbase-cdc', 'connector' = 'oceanbase-cdc',
@ -176,7 +176,7 @@ Flink SQL> CREATE TABLE enriched_orders (
customer_name STRING, customer_name STRING,
price DECIMAL(10, 5), price DECIMAL(10, 5),
product_id INT, product_id INT,
order_status TINYINT, order_status BOOLEAN,
product_name STRING, product_name STRING,
product_description STRING, product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED PRIMARY KEY (order_id) NOT ENFORCED

@ -23,20 +23,15 @@ import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig; import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator; import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction; import com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode; import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
/** /** A builder to build a SourceFunction which can read snapshot and change events of OceanBase. */
* A builder to build a SourceFunction which can read snapshot and continue to consume commit log.
*/
@PublicEvolving @PublicEvolving
public class OceanBaseSource { public class OceanBaseSource {
@ -71,7 +66,7 @@ public class OceanBaseSource {
private String configUrl; private String configUrl;
private String workingMode; private String workingMode;
private DebeziumDeserializationSchema<T> deserializer; private OceanBaseDeserializationSchema<T> deserializer;
public Builder<T> startupMode(StartupMode startupMode) { public Builder<T> startupMode(StartupMode startupMode) {
this.startupMode = startupMode; this.startupMode = startupMode;
@ -163,7 +158,7 @@ public class OceanBaseSource {
return this; return this;
} }
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) { public Builder<T> deserializer(OceanBaseDeserializationSchema<T> deserializer) {
this.deserializer = deserializer; this.deserializer = deserializer;
return this; return this;
} }
@ -202,7 +197,6 @@ public class OceanBaseSource {
if (serverTimeZone == null) { if (serverTimeZone == null) {
serverTimeZone = "+00:00"; serverTimeZone = "+00:00";
} }
ZoneOffset zoneOffset = ZoneId.of(serverTimeZone).getRules().getOffset(Instant.now());
if (connectTimeout == null) { if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30); connectTimeout = Duration.ofSeconds(30);
@ -245,7 +239,6 @@ public class OceanBaseSource {
databaseName, databaseName,
tableName, tableName,
tableList, tableList,
zoneOffset,
connectTimeout, connectTimeout,
hostname, hostname,
port, port,

@ -0,0 +1,47 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import com.oceanbase.oms.logmessage.ByteString;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
/**
* Runtime converter that converts objects of OceanBase into objects of Flink Table & SQL internal
* data structures.
*/
public interface OceanBaseDeserializationRuntimeConverter extends Serializable {
default Object convert(Object object) throws Exception {
if (object instanceof ByteString) {
return convertChangeEvent(
((ByteString) object).toString(StandardCharsets.UTF_8.name()));
} else {
return convertSnapshotEvent(object);
}
}
default Object convertSnapshotEvent(Object object) throws Exception {
throw new NotImplementedException();
}
default Object convertChangeEvent(String string) throws Exception {
throw new NotImplementedException();
}
}

@ -1,220 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import com.oceanbase.oms.logmessage.ByteString;
import com.oceanbase.oms.logmessage.DataMessage;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ValueConverterProvider;
import io.debezium.util.NumberConversions;
import org.apache.kafka.connect.data.Schema;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.ZoneOffset;
import java.util.Arrays;
/** Utils to convert jdbc type and value of a field. */
public class OceanBaseJdbcConverter {
public static ValueConverterProvider valueConverterProvider(ZoneOffset zoneOffset) {
return new JdbcValueConverters(
JdbcValueConverters.DecimalMode.STRING,
TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS,
zoneOffset,
null,
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
CommonConnectorConfig.BinaryHandlingMode.BYTES);
}
public static Object getField(int jdbcType, Object value) {
if (value == null) {
return null;
}
jdbcType = getType(jdbcType, null);
switch (jdbcType) {
case Types.BIT:
if (value instanceof Boolean) {
return new byte[] {NumberConversions.getByte((Boolean) value)};
}
return value;
case Types.INTEGER:
if (value instanceof Boolean) {
return NumberConversions.getInteger((Boolean) value);
}
if (value instanceof Date) {
return ((Date) value).getYear() + 1900;
}
return value;
case Types.FLOAT:
Float f = (Float) value;
return f.doubleValue();
case Types.DECIMAL:
if (value instanceof BigInteger) {
return value.toString();
}
BigDecimal decimal = (BigDecimal) value;
return decimal.toString();
case Types.DATE:
Date date = (Date) value;
return io.debezium.time.Date.toEpochDay(date, null);
case Types.TIME:
Time time = (Time) value;
return io.debezium.time.MicroTime.toMicroOfDay(time, true);
case Types.TIMESTAMP:
Timestamp timestamp = (Timestamp) value;
return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null);
default:
return value;
}
}
public static Object getField(
Schema.Type schemaType, DataMessage.Record.Field.Type fieldType, ByteString value) {
if (value == null) {
return null;
}
int jdbcType = getType(fieldType);
switch (jdbcType) {
case Types.NULL:
return null;
case Types.INTEGER:
if (schemaType.equals(Schema.Type.INT64)) {
return Long.parseLong(value.toString());
}
return Integer.parseInt(value.toString());
case Types.BIGINT:
if (schemaType.equals(Schema.Type.STRING)) {
return value.toString();
}
return Long.parseLong(value.toString());
case Types.DOUBLE:
return Double.parseDouble(value.toString());
case Types.DATE:
Date date = Date.valueOf(value.toString());
return io.debezium.time.Date.toEpochDay(date, null);
case Types.TIME:
Time time = Time.valueOf(value.toString());
return io.debezium.time.MicroTime.toMicroOfDay(time, true);
case Types.TIMESTAMP:
Timestamp timestamp = Timestamp.valueOf(value.toString());
return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null);
case Types.BIT:
long v = Long.parseLong(value.toString());
byte[] bytes = ByteBuffer.allocate(8).putLong(v).array();
int i = 0;
while (bytes[i] == 0 && i < Long.BYTES - 1) {
i++;
}
return Arrays.copyOfRange(bytes, i, Long.BYTES);
case Types.BINARY:
return ByteBuffer.wrap(value.toString().getBytes(StandardCharsets.UTF_8));
default:
return value.toString(StandardCharsets.UTF_8.toString());
}
}
private static boolean isBoolean(int jdbcType, String typeName) {
return jdbcType == Types.BOOLEAN || (jdbcType == Types.BIT && "TINYINT".equals(typeName));
}
public static int getType(int jdbcType, String typeName) {
// treat boolean as tinyint type
if (isBoolean(jdbcType, typeName)) {
jdbcType = Types.TINYINT;
}
// treat year as int type
if ("YEAR".equals(typeName)) {
jdbcType = Types.INTEGER;
}
// upcasting
if ("INT UNSIGNED".equals(typeName)) {
jdbcType = Types.BIGINT;
}
if ("BIGINT UNSIGNED".equals(typeName)) {
jdbcType = Types.DECIMAL;
}
// widening conversion according to com.mysql.jdbc.ResultSetImpl#getObject
switch (jdbcType) {
case Types.TINYINT:
case Types.SMALLINT:
return Types.INTEGER;
case Types.REAL:
return Types.FLOAT;
default:
return jdbcType;
}
}
public static int getType(DataMessage.Record.Field.Type fieldType) {
switch (fieldType) {
case NULL:
return Types.NULL;
case INT8:
case INT16:
case INT24:
case INT32:
case YEAR:
return Types.INTEGER;
case INT64:
return Types.BIGINT;
case FLOAT:
case DOUBLE:
return Types.DOUBLE;
case DECIMAL:
return Types.DECIMAL;
case ENUM:
case SET:
case STRING:
case JSON:
return Types.CHAR;
case TIMESTAMP:
case DATETIME:
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_NANO:
return Types.TIMESTAMP;
case DATE:
return Types.DATE;
case TIME:
return Types.TIME;
case BIT:
return Types.BIT;
case BLOB:
case BINARY:
return Types.BINARY;
case INTERVAL_YEAR_TO_MONTH:
case INTERVAL_DAY_TO_SECOND:
case GEOMETRY:
case RAW:
// it's weird to get wrong type from TEXT column, temporarily treat it as a string
case UNKOWN:
default:
return Types.VARCHAR;
}
}
}

@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.FlinkRuntimeException;
import com.mysql.jdbc.ResultSetMetaData;
import com.oceanbase.clogproxy.client.LogProxyClient; import com.oceanbase.clogproxy.client.LogProxyClient;
import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig; import com.oceanbase.clogproxy.client.config.ObReaderConfig;
@ -38,26 +37,21 @@ import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener; import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.oms.logmessage.DataMessage; import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.LogMessage; import com.oceanbase.oms.logmessage.LogMessage;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import io.debezium.relational.TableSchema; import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration; import java.time.Duration;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -86,7 +80,6 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
private final String databaseName; private final String databaseName;
private final String tableName; private final String tableName;
private final String tableList; private final String tableList;
private final ZoneOffset zoneOffset;
private final Duration connectTimeout; private final Duration connectTimeout;
private final String hostname; private final String hostname;
private final Integer port; private final Integer port;
@ -94,13 +87,12 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
private final int logProxyPort; private final int logProxyPort;
private final ClientConf logProxyClientConf; private final ClientConf logProxyClientConf;
private final ObReaderConfig obReaderConfig; private final ObReaderConfig obReaderConfig;
private final DebeziumDeserializationSchema<T> deserializer; private final OceanBaseDeserializationSchema<T> deserializer;
private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false); private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false);
private final List<LogMessage> logMessageBuffer = new LinkedList<>(); private final List<LogMessage> logMessageBuffer = new LinkedList<>();
private transient Set<String> tableSet; private transient Set<String> tableSet;
private transient Map<String, TableSchema> tableSchemaMap;
private transient volatile long resolvedTimestamp; private transient volatile long resolvedTimestamp;
private transient volatile OceanBaseConnection snapshotConnection; private transient volatile OceanBaseConnection snapshotConnection;
private transient LogProxyClient logProxyClient; private transient LogProxyClient logProxyClient;
@ -115,7 +107,6 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
String databaseName, String databaseName,
String tableName, String tableName,
String tableList, String tableList,
ZoneOffset zoneOffset,
Duration connectTimeout, Duration connectTimeout,
String hostname, String hostname,
Integer port, Integer port,
@ -123,7 +114,7 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
int logProxyPort, int logProxyPort,
ClientConf logProxyClientConf, ClientConf logProxyClientConf,
ObReaderConfig obReaderConfig, ObReaderConfig obReaderConfig,
DebeziumDeserializationSchema<T> deserializer) { OceanBaseDeserializationSchema<T> deserializer) {
this.snapshot = checkNotNull(snapshot); this.snapshot = checkNotNull(snapshot);
this.username = checkNotNull(username); this.username = checkNotNull(username);
this.password = checkNotNull(password); this.password = checkNotNull(password);
@ -131,7 +122,6 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
this.databaseName = databaseName; this.databaseName = databaseName;
this.tableName = tableName; this.tableName = tableName;
this.tableList = tableList; this.tableList = tableList;
this.zoneOffset = checkNotNull(zoneOffset);
this.connectTimeout = checkNotNull(connectTimeout); this.connectTimeout = checkNotNull(connectTimeout);
this.hostname = hostname; this.hostname = hostname;
this.port = port; this.port = port;
@ -146,7 +136,6 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
public void open(final Configuration config) throws Exception { public void open(final Configuration config) throws Exception {
super.open(config); super.open(config);
this.outputCollector = new OutputCollector<>(); this.outputCollector = new OutputCollector<>();
this.tableSchemaMap = new ConcurrentHashMap<>();
this.resolvedTimestamp = -1; this.resolvedTimestamp = -1;
} }
@ -157,25 +146,29 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
LOG.info("Start to initial table whitelist"); LOG.info("Start to initial table whitelist");
initTableWhiteList(); initTableWhiteList();
LOG.info("Start readChangeEvents process"); LOG.info("Start readChangeRecords process");
readChangeEvents(); readChangeRecords();
if (shouldReadSnapshot()) { if (shouldReadSnapshot()) {
synchronized (ctx.getCheckpointLock()) { synchronized (ctx.getCheckpointLock()) {
try { try {
readSnapshot(); readSnapshotRecords();
} finally { } finally {
closeSnapshotConnection(); closeSnapshotConnection();
} }
LOG.info("Snapshot reading finished"); LOG.info("Snapshot reading finished");
} }
} else { } else {
LOG.info("Skip snapshot read"); LOG.info("Skip snapshot reading");
} }
logProxyClient.join(); logProxyClient.join();
} }
private boolean shouldReadSnapshot() {
return resolvedTimestamp == -1 && snapshot;
}
private OceanBaseConnection getSnapshotConnection() { private OceanBaseConnection getSnapshotConnection() {
if (snapshotConnection == null) { if (snapshotConnection == null) {
snapshotConnection = snapshotConnection =
@ -253,20 +246,19 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
.collect(Collectors.joining("|"))); .collect(Collectors.joining("|")));
} }
protected void readSnapshot() { protected void readSnapshotRecords() {
tableSet.forEach( tableSet.forEach(
table -> { table -> {
String[] schema = table.split("\\."); String[] schema = table.split("\\.");
readSnapshotFromTable(schema[0], schema[1]); readSnapshotRecordsByTable(schema[0], schema[1]);
}); });
snapshotCompleted.set(true); snapshotCompleted.set(true);
} }
private void readSnapshotFromTable(String databaseName, String tableName) { private void readSnapshotRecordsByTable(String databaseName, String tableName) {
String topicName = getDefaultTopicName(tenantName, databaseName, tableName); OceanBaseRecord.SourceInfo sourceInfo =
Map<String, String> partition = getSourcePartition(tenantName, databaseName, tableName); new OceanBaseRecord.SourceInfo(
// the offset here is useless tenantName, databaseName, tableName, resolvedTimestamp);
Map<String, Object> offset = getSourceOffset(resolvedTimestamp);
String fullName = String.format("`%s`.`%s`", databaseName, tableName); String fullName = String.format("`%s`.`%s`", databaseName, tableName);
String selectSql = "SELECT * FROM " + fullName; String selectSql = "SELECT * FROM " + fullName;
@ -276,58 +268,17 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
.query( .query(
selectSql, selectSql,
rs -> { rs -> {
ResultSetMetaData metaData = (ResultSetMetaData) rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
String[] columnNames = new String[metaData.getColumnCount()];
int[] jdbcTypes = new int[metaData.getColumnCount()];
for (int i = 0; i < metaData.getColumnCount(); i++) {
columnNames[i] = metaData.getColumnName(i + 1);
jdbcTypes[i] =
OceanBaseJdbcConverter.getType(
metaData.getColumnType(i + 1),
metaData.getColumnTypeName(i + 1));
}
TableSchema tableSchema = tableSchemaMap.get(topicName);
if (tableSchema == null) {
tableSchema =
OceanBaseTableSchema.getTableSchema(
topicName,
databaseName,
tableName,
columnNames,
jdbcTypes,
zoneOffset);
tableSchemaMap.put(topicName, tableSchema);
}
Struct source =
OceanBaseSchemaUtils.sourceStruct(
tenantName, databaseName, tableName, null, null);
while (rs.next()) { while (rs.next()) {
Struct value = new Struct(tableSchema.valueSchema()); Map<String, Object> fieldMap = new HashMap<>();
for (int i = 0; i < metaData.getColumnCount(); i++) { for (int i = 0; i < metaData.getColumnCount(); i++) {
value.put( fieldMap.put(
columnNames[i], metaData.getColumnName(i + 1), rs.getObject(i + 1));
OceanBaseJdbcConverter.getField( }
jdbcTypes[i], rs.getObject(i + 1))); OceanBaseRecord record =
} new OceanBaseRecord(sourceInfo, fieldMap);
Struct struct =
tableSchema
.getEnvelopeSchema()
.create(value, source, null);
try { try {
deserializer.deserialize( deserializer.deserialize(record, outputCollector);
new SourceRecord(
partition,
offset,
topicName,
null,
null,
null,
struct.schema(),
struct),
outputCollector);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Deserialize snapshot record failed ", e); LOG.error("Deserialize snapshot record failed ", e);
throw new FlinkRuntimeException(e); throw new FlinkRuntimeException(e);
@ -341,7 +292,7 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
} }
} }
protected void readChangeEvents() throws InterruptedException, TimeoutException { protected void readChangeRecords() throws InterruptedException, TimeoutException {
if (resolvedTimestamp > 0) { if (resolvedTimestamp > 0) {
obReaderConfig.updateCheckpoint(Long.toString(resolvedTimestamp)); obReaderConfig.updateCheckpoint(Long.toString(resolvedTimestamp));
LOG.info("Read change events from resolvedTimestamp: {}", resolvedTimestamp); LOG.info("Read change events from resolvedTimestamp: {}", resolvedTimestamp);
@ -382,8 +333,7 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
msg -> { msg -> {
try { try {
deserializer.deserialize( deserializer.deserialize(
getRecordFromLogMessage(msg), getChangeRecord(msg), outputCollector);
outputCollector);
} catch (Exception e) { } catch (Exception e) {
throw new FlinkRuntimeException(e); throw new FlinkRuntimeException(e);
} }
@ -422,147 +372,15 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
LOG.info("LogProxyClient packet processing started"); LOG.info("LogProxyClient packet processing started");
} }
private SourceRecord getRecordFromLogMessage(LogMessage message) { private OceanBaseRecord getChangeRecord(LogMessage message) {
String databaseName = getDbName(message.getDbName()); String databaseName = message.getDbName().replace(tenantName + ".", "");
String topicName = getDefaultTopicName(tenantName, databaseName, message.getTableName()); OceanBaseRecord.SourceInfo sourceInfo =
new OceanBaseRecord.SourceInfo(
if (tableSchemaMap.get(topicName) == null) {
String[] columnNames = new String[message.getFieldCount()];
int[] jdbcTypes = new int[message.getFieldCount()];
int i = 0;
for (DataMessage.Record.Field field : message.getFieldList()) {
if (message.getOpt() == DataMessage.Record.Type.UPDATE && field.isPrev()) {
continue;
}
columnNames[i] = field.getFieldname();
jdbcTypes[i] = OceanBaseJdbcConverter.getType(field.getType());
i++;
}
TableSchema tableSchema =
OceanBaseTableSchema.getTableSchema(
topicName,
databaseName,
message.getTableName(),
columnNames,
jdbcTypes,
zoneOffset);
tableSchemaMap.put(topicName, tableSchema);
}
Struct source =
OceanBaseSchemaUtils.sourceStruct(
tenantName, tenantName,
databaseName, databaseName,
message.getTableName(), message.getTableName(),
String.valueOf(getCheckpointTimestamp(message)), getCheckpointTimestamp(message));
message.getOB10UniqueId()); return new OceanBaseRecord(sourceInfo, message.getOpt(), message.getFieldList());
Struct struct;
switch (message.getOpt()) {
case INSERT:
Struct after = getLogValueStruct(topicName, message.getFieldList());
struct =
tableSchemaMap
.get(topicName)
.getEnvelopeSchema()
.create(after, source, null);
break;
case UPDATE:
List<DataMessage.Record.Field> beforeFields = new ArrayList<>();
List<DataMessage.Record.Field> afterFields = new ArrayList<>();
for (DataMessage.Record.Field field : message.getFieldList()) {
if (field.isPrev()) {
beforeFields.add(field);
} else {
afterFields.add(field);
}
}
after = getLogValueStruct(topicName, afterFields);
Struct before = getLogValueStruct(topicName, beforeFields);
struct =
tableSchemaMap
.get(topicName)
.getEnvelopeSchema()
.update(before, after, source, null);
break;
case DELETE:
before = getLogValueStruct(topicName, message.getFieldList());
struct =
tableSchemaMap
.get(topicName)
.getEnvelopeSchema()
.delete(before, source, null);
break;
default:
throw new UnsupportedOperationException(
"Unsupported dml type: " + message.getOpt());
}
return new SourceRecord(
getSourcePartition(tenantName, databaseName, message.getTableName()),
getSourceOffset(getCheckpointTimestamp(message)),
topicName,
null,
null,
null,
struct.schema(),
struct);
}
private boolean shouldReadSnapshot() {
return resolvedTimestamp == -1 && snapshot;
}
private String getDbName(String origin) {
if (origin == null) {
return null;
}
return origin.replace(tenantName + ".", "");
}
private String getDefaultTopicName(String tenantName, String databaseName, String tableName) {
return String.format("%s.%s.%s", tenantName, databaseName, tableName);
}
private Map<String, String> getSourcePartition(
String tenantName, String databaseName, String tableName) {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("tenant", tenantName);
sourcePartition.put("database", databaseName);
sourcePartition.put("table", tableName);
return sourcePartition;
}
private Map<String, Object> getSourceOffset(long timestamp) {
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("timestamp", timestamp);
return sourceOffset;
}
private Struct getLogValueStruct(String topicName, List<DataMessage.Record.Field> fieldList) {
TableSchema tableSchema = tableSchemaMap.get(topicName);
Struct value = new Struct(tableSchema.valueSchema());
Object fieldValue;
for (DataMessage.Record.Field field : fieldList) {
try {
Schema fieldSchema = tableSchema.valueSchema().field(field.getFieldname()).schema();
fieldValue =
OceanBaseJdbcConverter.getField(
fieldSchema.type(), field.getType(), field.getValue());
value.put(field.getFieldname(), fieldValue);
} catch (NumberFormatException e) {
tableSchema =
OceanBaseTableSchema.upcastingTableSchema(
topicName,
tableSchema,
fieldList.stream()
.collect(
Collectors.toMap(
DataMessage.Record.Field::getFieldname,
f -> f.getValue().toString())));
tableSchemaMap.put(topicName, tableSchema);
return getLogValueStruct(topicName, fieldList);
}
}
return value;
} }
/** /**

@ -1,51 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
/** Utils to deal with OceanBase SourceRecord schema. */
public class OceanBaseSchemaUtils {
public static Schema sourceSchema() {
return SchemaBuilder.struct()
.field("tenant", Schema.STRING_SCHEMA)
.field("database", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA)
.field("timestamp", Schema.OPTIONAL_STRING_SCHEMA)
.field("unique_id", Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
public static Struct sourceStruct(
String tenant, String database, String table, String timestamp, String uniqueId) {
Struct struct =
new Struct(sourceSchema())
.put("tenant", tenant)
.put("database", database)
.put("table", table);
if (timestamp != null) {
struct.put("timestamp", timestamp);
}
if (uniqueId != null) {
struct.put("unique_id", uniqueId);
}
return struct;
}
}

@ -1,128 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.math.BigInteger;
import java.sql.Types;
import java.time.ZoneOffset;
import java.util.Map;
/** Utils to deal with table schema of OceanBase. */
public class OceanBaseTableSchema {
public static TableSchemaBuilder tableSchemaBuilder(ZoneOffset zoneOffset) {
return new TableSchemaBuilder(
OceanBaseJdbcConverter.valueConverterProvider(zoneOffset),
SchemaNameAdjuster.create(),
new CustomConverterRegistry(null),
OceanBaseSchemaUtils.sourceSchema(),
false);
}
public static TableId tableId(String databaseName, String tableName) {
return new TableId(databaseName, null, tableName);
}
public static Column getColumn(String name, int jdbcType) {
// we can't get the scale and length of decimal, timestamp and bit columns from log,
// so here we set a constant value to these fields to be compatible with the logic of
// JdbcValueConverters#schemaBuilder
ColumnEditor columnEditor =
Column.editor().name(name).jdbcType(jdbcType).optional(true).scale(0);
if (columnEditor.jdbcType() == Types.TIMESTAMP || columnEditor.jdbcType() == Types.BIT) {
columnEditor.length(6);
}
return columnEditor.create();
}
public static TableSchema getTableSchema(
String topicName,
String databaseName,
String tableName,
String[] columnNames,
int[] jdbcTypes,
ZoneOffset zoneOffset) {
TableEditor tableEditor = Table.editor().tableId(tableId(databaseName, tableName));
for (int i = 0; i < columnNames.length; i++) {
tableEditor.addColumn(getColumn(columnNames[i], jdbcTypes[i]));
}
return tableSchemaBuilder(zoneOffset)
.create(
null,
Envelope.schemaName(topicName),
tableEditor.create(),
null,
null,
null);
}
public static Schema upcastingSchemaType(Schema schema, String value) {
if (schema.type().equals(Schema.Type.INT32) && Long.parseLong(value) > Integer.MAX_VALUE) {
return Schema.INT64_SCHEMA;
}
if (schema.type().equals(Schema.Type.INT64)) {
BigInteger bigInt = new BigInteger(value);
if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
return Schema.STRING_SCHEMA;
}
}
return schema;
}
public static Schema upcastingValueSchema(Schema valueSchema, Map<String, String> fields) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().optional();
for (Map.Entry<String, String> entry : fields.entrySet()) {
Schema fieldSchema = valueSchema.field(entry.getKey()).schema();
fieldSchema = upcastingSchemaType(fieldSchema, entry.getValue());
schemaBuilder.field(entry.getKey(), fieldSchema);
}
return schemaBuilder.build();
}
public static Envelope getEnvelope(String name, Schema valueSchema) {
return Envelope.defineSchema()
.withName(name)
.withRecord(valueSchema)
.withSource(OceanBaseSchemaUtils.sourceSchema())
.build();
}
public static TableSchema upcastingTableSchema(
String topicName, TableSchema tableSchema, Map<String, String> fields) {
Schema valueSchema = upcastingValueSchema(tableSchema.valueSchema(), fields);
return new TableSchema(
tableSchema.id(),
null,
null,
getEnvelope(Envelope.schemaName(topicName), valueSchema),
valueSchema,
null);
}
}

@ -0,0 +1,679 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import com.oceanbase.oms.logmessage.ByteString;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseAppendMetadataCollector;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseMetadataConverter;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Deserialization schema from OceanBase object to Flink Table/SQL internal data structure {@link
* RowData}.
*/
public class RowDataOceanBaseDeserializationSchema
implements OceanBaseDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
/** TypeInformation of the produced {@link RowData}. * */
private final TypeInformation<RowData> resultTypeInfo;
/**
* Runtime converter that OceanBase record data into {@link RowData} consisted of physical
* column values.
*/
private final OceanBaseDeserializationRuntimeConverter physicalConverter;
/** Whether the deserializer needs to handle metadata columns. */
private final boolean hasMetadata;
/**
* A wrapped output collector which is used to append metadata columns after physical columns.
*/
private final OceanBaseAppendMetadataCollector appendMetadataCollector;
/** Returns a builder to build {@link RowDataOceanBaseDeserializationSchema}. */
public static RowDataOceanBaseDeserializationSchema.Builder newBuilder() {
return new RowDataOceanBaseDeserializationSchema.Builder();
}
RowDataOceanBaseDeserializationSchema(
RowType physicalDataType,
OceanBaseMetadataConverter[] metadataConverters,
TypeInformation<RowData> resultTypeInfo,
ZoneId serverTimeZone) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new OceanBaseAppendMetadataCollector(metadataConverters);
this.physicalConverter = createConverter(checkNotNull(physicalDataType), serverTimeZone);
this.resultTypeInfo = checkNotNull(resultTypeInfo);
}
@Override
public void deserialize(OceanBaseRecord record, Collector<RowData> out) throws Exception {
RowData physicalRow;
if (record.isSnapshotRecord()) {
physicalRow = (GenericRowData) physicalConverter.convert(record.getJdbcFields());
physicalRow.setRowKind(RowKind.INSERT);
emit(record, physicalRow, out);
} else {
switch (record.getOpt()) {
case INSERT:
physicalRow =
(GenericRowData)
physicalConverter.convert(record.getLogMessageFieldsAfter());
physicalRow.setRowKind(RowKind.INSERT);
emit(record, physicalRow, out);
break;
case DELETE:
physicalRow =
(GenericRowData)
physicalConverter.convert(record.getLogMessageFieldsBefore());
physicalRow.setRowKind(RowKind.DELETE);
emit(record, physicalRow, out);
break;
case UPDATE:
physicalRow =
(GenericRowData)
physicalConverter.convert(record.getLogMessageFieldsBefore());
physicalRow.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, physicalRow, out);
physicalRow =
(GenericRowData)
physicalConverter.convert(record.getLogMessageFieldsAfter());
physicalRow.setRowKind(RowKind.UPDATE_AFTER);
emit(record, physicalRow, out);
break;
default:
throw new IllegalArgumentException(
"Unsupported log message record type: " + record.getOpt());
}
}
}
private void emit(OceanBaseRecord row, RowData physicalRow, Collector<RowData> collector) {
if (!hasMetadata) {
collector.collect(physicalRow);
return;
}
appendMetadataCollector.inputRecord = row;
appendMetadataCollector.outputCollector = collector;
appendMetadataCollector.collect(physicalRow);
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
/** Builder class of {@link RowDataOceanBaseDeserializationSchema}. */
public static class Builder {
private RowType physicalRowType;
private TypeInformation<RowData> resultTypeInfo;
private OceanBaseMetadataConverter[] metadataConverters = new OceanBaseMetadataConverter[0];
private ZoneId serverTimeZone = ZoneId.of("UTC");
public RowDataOceanBaseDeserializationSchema.Builder setPhysicalRowType(
RowType physicalRowType) {
this.physicalRowType = physicalRowType;
return this;
}
public RowDataOceanBaseDeserializationSchema.Builder setMetadataConverters(
OceanBaseMetadataConverter[] metadataConverters) {
this.metadataConverters = metadataConverters;
return this;
}
public RowDataOceanBaseDeserializationSchema.Builder setResultTypeInfo(
TypeInformation<RowData> resultTypeInfo) {
this.resultTypeInfo = resultTypeInfo;
return this;
}
public RowDataOceanBaseDeserializationSchema.Builder setServerTimeZone(
ZoneId serverTimeZone) {
this.serverTimeZone = serverTimeZone;
return this;
}
public RowDataOceanBaseDeserializationSchema build() {
return new RowDataOceanBaseDeserializationSchema(
physicalRowType, metadataConverters, resultTypeInfo, serverTimeZone);
}
}
private static OceanBaseDeserializationRuntimeConverter createConverter(
LogicalType type, ZoneId serverTimeZone) {
return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone));
}
private static OceanBaseDeserializationRuntimeConverter wrapIntoNullableConverter(
OceanBaseDeserializationRuntimeConverter converter) {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object object) throws Exception {
if (object == null) {
return null;
}
return converter.convert(object);
}
};
}
public static OceanBaseDeserializationRuntimeConverter createNotNullConverter(
LogicalType type, ZoneId serverTimeZone) {
switch (type.getTypeRoot()) {
case ROW:
return createRowConverter((RowType) type, serverTimeZone);
case NULL:
return convertToNull();
case BOOLEAN:
return convertToBoolean();
case TINYINT:
return convertToTinyInt();
case SMALLINT:
return convertToSmallInt();
case INTEGER:
case INTERVAL_YEAR_MONTH:
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(serverTimeZone);
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
return convertToBinary();
case VARBINARY:
return convertToBytes();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ARRAY:
return createArrayConverter();
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static OceanBaseDeserializationRuntimeConverter createRowConverter(
RowType rowType, ZoneId serverTimeZone) {
final OceanBaseDeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(logicType -> createConverter(logicType, serverTimeZone))
.toArray(OceanBaseDeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object object) throws Exception {
int arity = fieldNames.length;
GenericRowData row = new GenericRowData(arity);
Map<String, Object> fieldMap = (Map<String, Object>) object;
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
row.setField(i, fieldConverters[i].convert(fieldMap.get(fieldName)));
}
return row;
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToNull() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object object) {
return null;
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToBoolean() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof byte[]) {
return "1".equals(new String((byte[]) object, StandardCharsets.UTF_8));
}
return Boolean.parseBoolean(object.toString()) || "1".equals(object.toString());
}
@Override
public Object convertChangeEvent(String string) {
return "1".equals(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToTinyInt() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
return Byte.parseByte(object.toString());
}
@Override
public Object convertChangeEvent(String string) {
return Byte.parseByte(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToSmallInt() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
return Short.parseShort(object.toString());
}
@Override
public Object convertChangeEvent(String string) {
return Short.parseShort(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToInt() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Integer) {
return object;
} else if (object instanceof Long) {
return ((Long) object).intValue();
} else if (object instanceof Date) {
return ((Date) object).toLocalDate().getYear();
} else {
return Integer.parseInt(object.toString());
}
}
@Override
public Object convertChangeEvent(String string) {
return Integer.parseInt(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToLong() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Integer) {
return ((Integer) object).longValue();
} else if (object instanceof Long) {
return object;
} else {
return Long.parseLong(object.toString());
}
}
@Override
public Object convertChangeEvent(String string) {
return Long.parseLong(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToDouble() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Float) {
return ((Float) object).doubleValue();
} else if (object instanceof Double) {
return object;
} else {
return Double.parseDouble(object.toString());
}
}
@Override
public Object convertChangeEvent(String string) {
return Double.parseDouble(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToFloat() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Float) {
return object;
} else if (object instanceof Double) {
return ((Double) object).floatValue();
} else {
return Float.parseFloat(object.toString());
}
}
@Override
public Object convertChangeEvent(String string) {
return Float.parseFloat(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToDate() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
return (int) TemporalConversions.toLocalDate(object).toEpochDay();
}
@Override
public Object convertChangeEvent(String string) {
return (int) Date.valueOf(string).toLocalDate().toEpochDay();
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToTime() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Long) {
return (int) ((Long) object / 1000_000);
}
return TemporalConversions.toLocalTime(object).toSecondOfDay() * 1000;
}
@Override
public Object convertChangeEvent(String string) {
return TemporalConversions.toLocalTime(Time.valueOf(string)).toSecondOfDay() * 1000;
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToTimestamp() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Timestamp) {
return TimestampData.fromLocalDateTime(((Timestamp) object).toLocalDateTime());
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ object
+ "' of type "
+ object.getClass().getName());
}
@Override
public Object convertChangeEvent(String string) {
return TimestampData.fromLocalDateTime(Timestamp.valueOf(string).toLocalDateTime());
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
ZoneId serverTimeZone) {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof Timestamp) {
return TimestampData.fromInstant(
((Timestamp) object)
.toLocalDateTime()
.atZone(serverTimeZone)
.toInstant());
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ object
+ "' of type "
+ object.getClass().getName());
}
@Override
public Object convertChangeEvent(String string) {
return TimestampData.fromInstant(
Timestamp.valueOf(string)
.toLocalDateTime()
.atZone(serverTimeZone)
.toInstant());
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToString() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
return StringData.fromString(object.toString());
}
@Override
public Object convertChangeEvent(String string) {
return StringData.fromString(string);
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToBinary() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof byte[]) {
String str = new String((byte[]) object, StandardCharsets.US_ASCII);
return str.getBytes(StandardCharsets.UTF_8);
} else if (object instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) object;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BINARY value type: " + object.getClass().getSimpleName());
}
}
@Override
public Object convertChangeEvent(String string) {
try {
long v = Long.parseLong(string);
byte[] bytes = ByteBuffer.allocate(8).putLong(v).array();
int i = 0;
while (i < Long.BYTES - 1 && bytes[i] == 0) {
i++;
}
return Arrays.copyOfRange(bytes, i, Long.BYTES);
} catch (NumberFormatException e) {
return string.getBytes(StandardCharsets.UTF_8);
}
}
};
}
private static OceanBaseDeserializationRuntimeConverter convertToBytes() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
if (object instanceof byte[]) {
return object;
} else if (object instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) object;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + object.getClass().getSimpleName());
}
}
@Override
public Object convertChangeEvent(String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
};
}
private static OceanBaseDeserializationRuntimeConverter createDecimalConverter(
DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convertSnapshotEvent(Object object) {
BigDecimal bigDecimal;
if (object instanceof String) {
bigDecimal = new BigDecimal((String) object);
} else if (object instanceof Long) {
bigDecimal = new BigDecimal((Long) object);
} else if (object instanceof BigInteger) {
bigDecimal = new BigDecimal((BigInteger) object);
} else if (object instanceof Double) {
bigDecimal = BigDecimal.valueOf((Double) object);
} else if (object instanceof BigDecimal) {
bigDecimal = (BigDecimal) object;
} else {
throw new IllegalArgumentException(
"Unable to convert to decimal from unexpected value '"
+ object
+ "' of type "
+ object.getClass());
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
@Override
public Object convertChangeEvent(String string) {
return DecimalData.fromBigDecimal(new BigDecimal(string), precision, scale);
}
};
}
private static OceanBaseDeserializationRuntimeConverter createArrayConverter() {
return new OceanBaseDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object object) {
String s;
if (object instanceof ByteString) {
s = ((ByteString) object).toString(StandardCharsets.UTF_8.name());
} else {
s = object.toString();
}
String[] strArray = s.split(",");
StringData[] stringDataArray = new StringData[strArray.length];
for (int i = 0; i < strArray.length; i++) {
stringDataArray[i] = StringData.fromString(strArray[i]);
}
return new GenericArrayData(stringDataArray);
}
};
}
}

@ -0,0 +1,56 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.util.Collector;
import java.io.Serializable;
/** Emits a row with physical fields and metadata fields. */
@Internal
public class OceanBaseAppendMetadataCollector implements Collector<RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final OceanBaseMetadataConverter[] metadataConverters;
public transient OceanBaseRecord inputRecord;
public transient Collector<RowData> outputCollector;
public OceanBaseAppendMetadataCollector(OceanBaseMetadataConverter[] metadataConverters) {
this.metadataConverters = metadataConverters;
}
@Override
public void collect(RowData physicalRow) {
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
for (int i = 0; i < metadataConverters.length; i++) {
Object meta = metadataConverters[i].read(inputRecord);
metaRow.setField(i, meta);
}
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
outputCollector.collect(outRow);
}
@Override
public void close() {
// nothing to do
}
}

@ -0,0 +1,36 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import java.io.Serializable;
/**
* The deserialization schema describes how to turn the OceanBase record into data types (Java/Scala
* objects) that are processed by Flink.
*
* @param <T> The type created by the deserialization schema.
*/
@PublicEvolving
public interface OceanBaseDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/** Deserialize the OceanBase record, it is represented in {@link OceanBaseRecord}. */
void deserialize(OceanBaseRecord record, Collector<T> out) throws Exception;
}

@ -0,0 +1,28 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.annotation.Internal;
import java.io.Serializable;
/** A converter converts OceanBase record metadata into Flink internal data structures. */
@FunctionalInterface
@Internal
public interface OceanBaseMetadataConverter extends Serializable {
Object read(OceanBaseRecord record);
}

@ -21,11 +21,6 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link OceanBaseTableSource}. */ /** Defines the supported metadata columns for {@link OceanBaseTableSource}. */
public enum OceanBaseReadableMetadata { public enum OceanBaseReadableMetadata {
@ -33,14 +28,12 @@ public enum OceanBaseReadableMetadata {
TENANT( TENANT(
"tenant_name", "tenant_name",
DataTypes.STRING().notNull(), DataTypes.STRING().notNull(),
new MetadataConverter() { new OceanBaseMetadataConverter() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public Object read(SourceRecord record) { public Object read(OceanBaseRecord record) {
Struct value = (Struct) record.value(); return StringData.fromString(record.getSourceInfo().getTenant());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(source.getString("tenant"));
} }
}), }),
@ -48,14 +41,12 @@ public enum OceanBaseReadableMetadata {
DATABASE( DATABASE(
"database_name", "database_name",
DataTypes.STRING().notNull(), DataTypes.STRING().notNull(),
new MetadataConverter() { new OceanBaseMetadataConverter() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public Object read(SourceRecord record) { public Object read(OceanBaseRecord record) {
Struct value = (Struct) record.value(); return StringData.fromString(record.getSourceInfo().getDatabase());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(source.getString("database"));
} }
}), }),
@ -63,14 +54,12 @@ public enum OceanBaseReadableMetadata {
TABLE( TABLE(
"table_name", "table_name",
DataTypes.STRING().notNull(), DataTypes.STRING().notNull(),
new MetadataConverter() { new OceanBaseMetadataConverter() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public Object read(SourceRecord record) { public Object read(OceanBaseRecord record) {
Struct value = (Struct) record.value(); return StringData.fromString(record.getSourceInfo().getTable());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(source.getString("table"));
} }
}), }),
@ -81,18 +70,13 @@ public enum OceanBaseReadableMetadata {
OP_TS( OP_TS(
"op_ts", "op_ts",
DataTypes.TIMESTAMP_LTZ(3).notNull(), DataTypes.TIMESTAMP_LTZ(3).notNull(),
new MetadataConverter() { new OceanBaseMetadataConverter() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public Object read(SourceRecord record) { public Object read(OceanBaseRecord record) {
Struct value = (Struct) record.value(); return TimestampData.fromEpochMillis(
Struct source = value.getStruct(Envelope.FieldName.SOURCE); record.getSourceInfo().getTimestampS() * 1000);
String timestamp = source.getString("timestamp");
if (timestamp == null) {
timestamp = "0";
}
return TimestampData.fromEpochMillis(Long.parseLong(timestamp) * 1000);
} }
}); });
@ -100,9 +84,9 @@ public enum OceanBaseReadableMetadata {
private final DataType dataType; private final DataType dataType;
private final MetadataConverter converter; private final OceanBaseMetadataConverter converter;
OceanBaseReadableMetadata(String key, DataType dataType, MetadataConverter converter) { OceanBaseReadableMetadata(String key, DataType dataType, OceanBaseMetadataConverter converter) {
this.key = key; this.key = key;
this.dataType = dataType; this.dataType = dataType;
this.converter = converter; this.converter = converter;
@ -116,7 +100,7 @@ public enum OceanBaseReadableMetadata {
return dataType; return dataType;
} }
public MetadataConverter getConverter() { public OceanBaseMetadataConverter getConverter() {
return converter; return converter;
} }
} }

@ -0,0 +1,118 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import com.oceanbase.oms.logmessage.ByteString;
import com.oceanbase.oms.logmessage.DataMessage;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** An internal data structure representing record of OceanBase. */
public class OceanBaseRecord implements Serializable {
private static final long serialVersionUID = 1L;
private final SourceInfo sourceInfo;
private final boolean isSnapshotRecord;
private Map<String, Object> jdbcFields;
private DataMessage.Record.Type opt;
private Map<String, ByteString> logMessageFieldsBefore;
private Map<String, ByteString> logMessageFieldsAfter;
public OceanBaseRecord(SourceInfo sourceInfo, Map<String, Object> jdbcFields) {
this.sourceInfo = sourceInfo;
this.isSnapshotRecord = true;
this.jdbcFields = jdbcFields;
}
public OceanBaseRecord(
SourceInfo sourceInfo,
DataMessage.Record.Type opt,
List<DataMessage.Record.Field> logMessageFieldList) {
this.sourceInfo = sourceInfo;
this.isSnapshotRecord = false;
this.opt = opt;
this.logMessageFieldsBefore = new HashMap<>();
this.logMessageFieldsAfter = new HashMap<>();
for (DataMessage.Record.Field field : logMessageFieldList) {
if (field.isPrev()) {
logMessageFieldsBefore.put(field.getFieldname(), field.getValue());
} else {
logMessageFieldsAfter.put(field.getFieldname(), field.getValue());
}
}
}
public SourceInfo getSourceInfo() {
return sourceInfo;
}
public boolean isSnapshotRecord() {
return isSnapshotRecord;
}
public Map<String, Object> getJdbcFields() {
return jdbcFields;
}
public DataMessage.Record.Type getOpt() {
return opt;
}
public Map<String, ByteString> getLogMessageFieldsBefore() {
return logMessageFieldsBefore;
}
public Map<String, ByteString> getLogMessageFieldsAfter() {
return logMessageFieldsAfter;
}
/** Information about the source of record. */
public static class SourceInfo implements Serializable {
private static final long serialVersionUID = 1L;
private final String tenant;
private final String database;
private final String table;
private final long timestampS;
public SourceInfo(String tenant, String database, String table, long timestampS) {
this.tenant = tenant;
this.database = database;
this.table = table;
this.timestampS = timestampS;
}
public String getTenant() {
return tenant;
}
public String getDatabase() {
return database;
}
public String getTable() {
return table;
}
public long getTimestampS() {
return timestampS;
}
}
}

@ -29,9 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.time.Duration; import java.time.Duration;
import java.time.ZoneId; import java.time.ZoneId;
@ -138,11 +136,11 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
RowType physicalDataType = RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters(); OceanBaseMetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> resultTypeInfo = context.createTypeInformation(producedDataType); TypeInformation<RowData> resultTypeInfo = context.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer = RowDataOceanBaseDeserializationSchema deserializer =
RowDataDebeziumDeserializeSchema.newBuilder() RowDataOceanBaseDeserializationSchema.newBuilder()
.setPhysicalRowType(physicalDataType) .setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters) .setMetadataConverters(metadataConverters)
.setResultTypeInfo(resultTypeInfo) .setResultTypeInfo(resultTypeInfo)
@ -173,9 +171,9 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
return SourceFunctionProvider.of(builder.build(), false); return SourceFunctionProvider.of(builder.build(), false);
} }
protected MetadataConverter[] getMetadataConverters() { protected OceanBaseMetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) { if (metadataKeys.isEmpty()) {
return new MetadataConverter[0]; return new OceanBaseMetadataConverter[0];
} }
return metadataKeys.stream() return metadataKeys.stream()
.map( .map(
@ -185,7 +183,7 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
.findFirst() .findFirst()
.orElseThrow(IllegalStateException::new)) .orElseThrow(IllegalStateException::new))
.map(OceanBaseReadableMetadata::getConverter) .map(OceanBaseReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new); .toArray(OceanBaseMetadataConverter[]::new);
} }
@Override @Override

@ -32,7 +32,6 @@ import java.util.Set;
public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory { public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "oceanbase-cdc"; private static final String IDENTIFIER = "oceanbase-cdc";
private static final String OB_CDC_PREFIX = "obcdc.";
public static final ConfigOption<String> SCAN_STARTUP_MODE = public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode") ConfigOptions.key("scan.startup.mode")

@ -108,7 +108,7 @@ public class OceanBaseTestBase extends TestLogger {
@ClassRule @ClassRule
public static final GenericContainer<?> OB_SERVER = public static final GenericContainer<?> OB_SERVER =
new GenericContainer<>("oceanbase/oceanbase-ce:3.1.3_bp1") new GenericContainer<>("oceanbase/oceanbase-ce:3.1.4")
.withNetworkMode(NETWORK_MODE) .withNetworkMode(NETWORK_MODE)
.withExposedPorts(OB_SERVER_SQL_PORT, OB_SERVER_RPC_PORT) .withExposedPorts(OB_SERVER_SQL_PORT, OB_SERVER_RPC_PORT)
.withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD) .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD)
@ -118,7 +118,7 @@ public class OceanBaseTestBase extends TestLogger {
@ClassRule @ClassRule
public static final GenericContainer<?> LOG_PROXY = public static final GenericContainer<?> LOG_PROXY =
new GenericContainer<>("whhe/oblogproxy:1.0.2") new GenericContainer<>("whhe/oblogproxy:1.0.3")
.withNetworkMode(NETWORK_MODE) .withNetworkMode(NETWORK_MODE)
.withExposedPorts(LOG_PROXY_PORT) .withExposedPorts(LOG_PROXY_PORT)
.withEnv("OB_SYS_USERNAME", OB_SYS_USERNAME) .withEnv("OB_SYS_USERNAME", OB_SYS_USERNAME)

@ -296,13 +296,16 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase {
String.format( String.format(
"CREATE TABLE ob_source (\n" "CREATE TABLE ob_source (\n"
+ " `id` INT NOT NULL,\n" + " `id` INT NOT NULL,\n"
+ " bool_c TINYINT,\n" + " bit1_c BOOLEAN,\n"
+ " tiny1_c BOOLEAN,\n"
+ " boolean_c BOOLEAN,\n"
+ " tiny_c TINYINT,\n" + " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT,\n" + " tiny_un_c SMALLINT,\n"
+ " small_c SMALLINT ,\n" + " small_c SMALLINT ,\n"
+ " small_un_c INT ,\n" + " small_un_c INT ,\n"
+ " medium_c INT,\n" + " medium_c INT,\n"
+ " medium_un_c INT,\n" + " medium_un_c INT,\n"
+ " int11_c INT,\n"
+ " int_c INT,\n" + " int_c INT,\n"
+ " int_un_c BIGINT,\n" + " int_un_c BIGINT,\n"
+ " big_c BIGINT,\n" + " big_c BIGINT,\n"
@ -317,18 +320,22 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase {
+ " time_c TIME(0),\n" + " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n" + " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n" + " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP,\n" + " timestamp_c TIMESTAMP_LTZ,\n"
+ " timestamp3_c TIMESTAMP_LTZ(3),\n"
+ " timestamp6_c TIMESTAMP_LTZ(6),\n"
+ " char_c CHAR(3),\n" + " char_c CHAR(3),\n"
+ " varchar_c VARCHAR(255),\n" + " varchar_c VARCHAR(255),\n"
+ " file_uuid BINARY(16),\n"
+ " bit_c BINARY(8),\n" + " bit_c BINARY(8),\n"
+ " text_c STRING,\n" + " text_c STRING,\n"
+ " tiny_blob_c BYTES,\n" + " tiny_blob_c BYTES,\n"
+ " medium_blob_c BYTES,\n" + " medium_blob_c BYTES,\n"
+ " long_blob_c BYTES,\n"
+ " blob_c BYTES,\n" + " blob_c BYTES,\n"
+ " long_blob_c BYTES,\n"
+ " year_c INT,\n" + " year_c INT,\n"
+ " set_c STRING,\n" + " set_c ARRAY<STRING>,\n"
+ " enum_c STRING,\n" + " enum_c STRING,\n"
+ " json_c STRING,\n"
+ " primary key (`id`) not enforced" + " primary key (`id`) not enforced"
+ ") WITH (" + ") WITH ("
+ " 'connector' = 'oceanbase-cdc'," + " 'connector' = 'oceanbase-cdc',"
@ -360,13 +367,16 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase {
String sinkDDL = String sinkDDL =
"CREATE TABLE sink (" "CREATE TABLE sink ("
+ " `id` INT NOT NULL,\n" + " `id` INT NOT NULL,\n"
+ " bool_c TINYINT,\n" + " bit1_c BOOLEAN,\n"
+ " tiny1_c BOOLEAN,\n"
+ " boolean_c BOOLEAN,\n"
+ " tiny_c TINYINT,\n" + " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT,\n" + " tiny_un_c SMALLINT,\n"
+ " small_c SMALLINT ,\n" + " small_c SMALLINT ,\n"
+ " small_un_c INT ,\n" + " small_un_c INT ,\n"
+ " medium_c INT,\n" + " medium_c INT,\n"
+ " medium_un_c INT,\n" + " medium_un_c INT,\n"
+ " int11_c INT,\n"
+ " int_c INT,\n" + " int_c INT,\n"
+ " int_un_c BIGINT,\n" + " int_un_c BIGINT,\n"
+ " big_c BIGINT,\n" + " big_c BIGINT,\n"
@ -382,8 +392,11 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase {
+ " datetime3_c TIMESTAMP(3),\n" + " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n" + " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP,\n" + " timestamp_c TIMESTAMP,\n"
+ " timestamp3_c TIMESTAMP(3),\n"
+ " timestamp6_c TIMESTAMP(6),\n"
+ " char_c CHAR(3),\n" + " char_c CHAR(3),\n"
+ " varchar_c VARCHAR(255),\n" + " varchar_c VARCHAR(255),\n"
+ " file_uuid BINARY(16),\n"
+ " bit_c BINARY(8),\n" + " bit_c BINARY(8),\n"
+ " text_c STRING,\n" + " text_c STRING,\n"
+ " tiny_blob_c BYTES,\n" + " tiny_blob_c BYTES,\n"
@ -391,8 +404,9 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase {
+ " blob_c BYTES,\n" + " blob_c BYTES,\n"
+ " long_blob_c BYTES,\n" + " long_blob_c BYTES,\n"
+ " year_c INT,\n" + " year_c INT,\n"
+ " set_c ARRAY<STRING>,\n"
+ " enum_c STRING,\n" + " enum_c STRING,\n"
+ " set_c STRING,\n" + " json_c STRING,\n"
+ " primary key (`id`) not enforced" + " primary key (`id`) not enforced"
+ ") WITH (" + ") WITH ("
+ " 'connector' = 'values'," + " 'connector' = 'values',"
@ -417,8 +431,107 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase {
List<String> expected = List<String> expected =
Arrays.asList( Arrays.asList(
"+I(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)", "+I(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})",
"+U(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)"); "+U(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testTimezoneBerlin() throws Exception {
testTimeDataTypes("+02:00");
}
@Test
public void testTimezoneShanghai() throws Exception {
testTimeDataTypes("+08:00");
}
public void testTimeDataTypes(String serverTimeZone) throws Exception {
try (Connection connection = getJdbcConnection("");
Statement statement = connection.createStatement()) {
statement.execute(String.format("SET GLOBAL time_zone = '%s';", serverTimeZone));
}
tEnv.getConfig().setLocalTimeZone(ZoneId.of(serverTimeZone));
initializeTable("column_type_test");
String sourceDDL =
String.format(
"CREATE TABLE ob_source (\n"
+ " `id` INT NOT NULL,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP_LTZ,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'oceanbase-cdc',"
+ " 'scan.startup.mode' = 'initial',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'tenant-name' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = '%s',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s',"
+ " 'rootserver-list' = '%s',"
+ " 'working-mode' = 'memory'"
+ ")",
getUsername(),
getPassword(),
getTenant(),
"column_type_test",
"full_types",
serverTimeZone,
getObServerHost(),
getObServerSqlPort(),
getLogProxyHost(),
getLogProxyPort(),
getRsList());
String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT `id`, date_c, time_c, datetime3_c, datetime6_c, cast(timestamp_c as timestamp) FROM ob_source");
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 1);
int snapshotSize = sinkSize("sink");
try (Connection connection = getJdbcConnection("column_type_test");
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
waitForSinkSize("sink", snapshotSize + 1);
List<String> expected =
Arrays.asList(
"+I(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22)",
"+U(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22)");
List<String> actual = TestValuesTableFactory.getRawResults("sink"); List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertContainsInAnyOrder(expected, actual); assertContainsInAnyOrder(expected, actual);

@ -21,13 +21,16 @@ USE column_type_test;
CREATE TABLE full_types CREATE TABLE full_types
( (
id INT AUTO_INCREMENT NOT NULL, id INT AUTO_INCREMENT NOT NULL,
bool_c BOOLEAN, bit1_c BIT,
tiny1_c TINYINT(1),
boolean_c BOOLEAN,
tiny_c TINYINT, tiny_c TINYINT,
tiny_un_c TINYINT UNSIGNED, tiny_un_c TINYINT UNSIGNED,
small_c SMALLINT, small_c SMALLINT,
small_un_c SMALLINT UNSIGNED, small_un_c SMALLINT UNSIGNED,
medium_c MEDIUMINT, medium_c MEDIUMINT,
medium_un_c MEDIUMINT UNSIGNED, medium_un_c MEDIUMINT UNSIGNED,
int11_c INT(11),
int_c INTEGER, int_c INTEGER,
int_un_c INTEGER UNSIGNED, int_un_c INTEGER UNSIGNED,
big_c BIGINT, big_c BIGINT,
@ -40,11 +43,14 @@ CREATE TABLE full_types
big_decimal_c DECIMAL(65, 1), big_decimal_c DECIMAL(65, 1),
date_c DATE, date_c DATE,
time_c TIME(0), time_c TIME(0),
datetime3_c TIMESTAMP(3), datetime3_c DATETIME(3),
datetime6_c TIMESTAMP(6), datetime6_c DATETIME(6),
timestamp_c TIMESTAMP, timestamp_c TIMESTAMP,
timestamp3_c TIMESTAMP(3),
timestamp6_c TIMESTAMP(6),
char_c CHAR(3), char_c CHAR(3),
varchar_c VARCHAR(255), varchar_c VARCHAR(255),
file_uuid BINARY(16),
bit_c BIT(64), bit_c BIT(64),
text_c TEXT, text_c TEXT,
tiny_blob_c TINYBLOB, tiny_blob_c TINYBLOB,
@ -54,12 +60,15 @@ CREATE TABLE full_types
year_c YEAR, year_c YEAR,
set_c SET ('a', 'b'), set_c SET ('a', 'b'),
enum_c ENUM ('red', 'green', 'blue'), enum_c ENUM ('red', 'green', 'blue'),
json_c JSON,
PRIMARY KEY (id) PRIMARY KEY (id)
) DEFAULT CHARSET = utf8mb4; ) DEFAULT CHARSET = utf8mb4;
INSERT INTO full_types INSERT INTO full_types
VALUES (DEFAULT, true, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 9223372036854775807, VALUES (DEFAULT, 0, 1, true, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 2147483647, 4294967295,
18446744073709551615, 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, '2020-07-17', '18:00:22', 9223372036854775807, 18446744073709551615, 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1,
'2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', 'abc', 'Hello World', '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
'abc', 'Hello World', unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400', '-', '')),
b'0000010000000100000001000000010000000100000001000000010000000100', 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), b'0000010000000100000001000000010000000100000001000000010000000100', 'text', UNHEX(HEX(16)), UNHEX(HEX(16)),
UNHEX(HEX(16)), UNHEX(HEX(16)), 2022, 'a', 'red'); UNHEX(HEX(16)), UNHEX(HEX(16)), 2022, 'a,b,a', 'red', '{"key1":"value1"}');

@ -89,7 +89,7 @@ under the License.
<slf4j.version>1.7.15</slf4j.version> <slf4j.version>1.7.15</slf4j.version>
<log4j.version>2.17.1</log4j.version> <log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version> <spotless.version>2.4.2</spotless.version>
<oblogclient.version>1.0.5</oblogclient.version> <oblogclient.version>1.0.6</oblogclient.version>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests --> <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<flink.forkCount>1</flink.forkCount> <flink.forkCount>1</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks> <flink.reuseForks>true</flink.reuseForks>

Loading…
Cancel
Save