From b1b092c97e1e8dca89c841d45047909f7144c8c2 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 19 Oct 2022 19:30:02 +0800 Subject: [PATCH] [oceanbase] Add new deserialization schema with runtime converter (#1356) This closes #980. --- docs/content/connectors/oceanbase-cdc.md | 121 ++-- docs/content/quickstart/oceanbase-tutorial.md | 8 +- .../快速上手/oceanbase-tutorial-zh.md | 8 +- .../connectors/oceanbase/OceanBaseSource.java | 15 +- ...anBaseDeserializationRuntimeConverter.java | 47 ++ .../source/OceanBaseJdbcConverter.java | 220 ------ .../source/OceanBaseRichSourceFunction.java | 250 +------ .../source/OceanBaseSchemaUtils.java | 51 -- .../source/OceanBaseTableSchema.java | 128 ---- ...RowDataOceanBaseDeserializationSchema.java | 679 ++++++++++++++++++ .../OceanBaseAppendMetadataCollector.java | 56 ++ .../table/OceanBaseDeserializationSchema.java | 36 + .../table/OceanBaseMetadataConverter.java | 28 + .../table/OceanBaseReadableMetadata.java | 48 +- .../oceanbase/table/OceanBaseRecord.java | 118 +++ .../oceanbase/table/OceanBaseTableSource.java | 16 +- .../table/OceanBaseTableSourceFactory.java | 1 - .../oceanbase/OceanBaseTestBase.java | 4 +- .../table/OceanBaseConnectorITCase.java | 129 +++- .../test/resources/ddl/column_type_test.sql | 23 +- pom.xml | 2 +- 21 files changed, 1254 insertions(+), 734 deletions(-) create mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java delete mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java delete mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java delete mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java create mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/RowDataOceanBaseDeserializationSchema.java create mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseAppendMetadataCollector.java create mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseDeserializationSchema.java create mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseMetadataConverter.java create mode 100644 flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseRecord.java diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index b9e40a7e9..9705db26a 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -349,50 +349,78 @@ The OceanBase CDC Connector using [oblogclient](https://github.com/oceanbase/obl The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: ```java +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; -import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory; +import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema; import com.ververica.cdc.connectors.oceanbase.table.StartupMode; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -public class OceanBaseSourceExample { +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; - public static void main(String[] args) throws Exception { - SourceFunction oceanBaseSource = - OceanBaseSource.builder() - .rsList("127.0.0.1:2882:2881") // set root server list - .startupMode(StartupMode.INITIAL) // set startup mode - .username("user@test_tenant") // set cluster username - .password("pswd") // set cluster password - .tenantName("test_tenant") // set captured tenant name, do not support regex - .databaseName("test_db") // set captured database, support regex - .tableName("test_table") // set captured table, support regex - .hostname("127.0.0.1") // set hostname of OceanBase server or proxy - .port(2881) // set the sql port for OceanBase server or proxy - .logProxyHost("127.0.0.1") // set the hostname of log proxy - .logProxyPort(2983) // set the port of log proxy - .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String - .build(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // enable checkpoint - env.enableCheckpointing(3000); - - env.addSource(oceanBaseSource).print().setParallelism(1); - - env.execute("Print OceanBase Snapshot + Commit Log"); - } +public class OceanBaseSourceExample { + public static void main(String[] args) throws Exception { + ResolvedSchema resolvedSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("name", DataTypes.STRING().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + + RowType physicalDataType = + (RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType(); + TypeInformation resultTypeInfo = InternalTypeInfo.of(physicalDataType); + String serverTimeZone = "+00:00"; + + OceanBaseDeserializationSchema deserializer = + RowDataOceanBaseDeserializationSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setResultTypeInfo(resultTypeInfo) + .setServerTimeZone(ZoneId.of(serverTimeZone)) + .build(); + + SourceFunction oceanBaseSource = + OceanBaseSource.builder() + .rsList("127.0.0.1:2882:2881") + .startupMode(StartupMode.INITIAL) + .username("user@test_tenant") + .password("pswd") + .tenantName("test_tenant") + .databaseName("test_db") + .tableName("test_table") + .hostname("127.0.0.1") + .port(2881) + .logProxyHost("127.0.0.1") + .logProxyPort(2983) + .serverTimeZone(serverTimezone) + .deserializer(deserializer) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + env.addSource(oceanBaseSource).print().setParallelism(1); + env.execute("Print OceanBase Snapshot + Change Events"); + } } ``` Data Type Mapping ---------------- -When the startup mode is not `INITIAL`, we will not be able to get the precision and scale of a column. In order to be compatible with different startup modes, we will not map one OceanBase type of different precision to different FLink types. - -For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) or BIT(1). BOOLEAN is equivalent to TINYINT(1) in OceanBase, so columns of BOOLEAN and TINYINT types will be mapped to TINYINT in Flink, and BIT(1) will be mapped to BINARY(1) in Flink. -
@@ -405,7 +433,13 @@ For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) o + TINYINT(1)
+ BIT(1) + + + + + @@ -483,11 +517,13 @@ For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) o - - + + + + + + + @@ -547,8 +583,13 @@ For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) o + + + + + - +
BOOLEAN
- TINYINT
BOOLEAN
TINYINT TINYINT
TIMESTAMP [(p)]
- DATETIME [(p)] -
TIMESTAMP [(p)] - DATETIME [(p)]TIMESTAMP [(p)]
TIMESTAMP [(p)]TIMESTAMP_LTZ [(p)]
SETARRAY<STRING>As the SET data type in OceanBase is a string object that can have zero or more values, it should always be mapped to an array of string
JSON STRINGThe JSON data type will be converted into STRING with JSON format in Flink.
diff --git a/docs/content/quickstart/oceanbase-tutorial.md b/docs/content/quickstart/oceanbase-tutorial.md index 444d1b64b..98591958b 100644 --- a/docs/content/quickstart/oceanbase-tutorial.md +++ b/docs/content/quickstart/oceanbase-tutorial.md @@ -15,13 +15,13 @@ Create `docker-compose.yml`. version: '2.1' services: observer: - image: oceanbase/oceanbase-ce:3.1.3_bp1 + image: oceanbase/oceanbase-ce:3.1.4 container_name: observer environment: - 'OB_ROOT_PASSWORD=pswd' network_mode: "host" oblogproxy: - image: whhe/oblogproxy:1.0.2 + image: whhe/oblogproxy:1.0.3 container_name: oblogproxy environment: - 'OB_SYS_USERNAME=root' @@ -130,7 +130,7 @@ Flink SQL> CREATE TABLE orders ( customer_name STRING, price DECIMAL(10, 5), product_id INT, - order_status TINYINT, + order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', @@ -177,7 +177,7 @@ Flink SQL> CREATE TABLE enriched_orders ( customer_name STRING, price DECIMAL(10, 5), product_id INT, - order_status TINYINT, + order_status BOOLEAN, product_name STRING, product_description STRING, PRIMARY KEY (order_id) NOT ENFORCED diff --git a/docs/content/快速上手/oceanbase-tutorial-zh.md b/docs/content/快速上手/oceanbase-tutorial-zh.md index 116532e61..bebc13582 100644 --- a/docs/content/快速上手/oceanbase-tutorial-zh.md +++ b/docs/content/快速上手/oceanbase-tutorial-zh.md @@ -16,13 +16,13 @@ version: '2.1' services: observer: - image: oceanbase/oceanbase-ce:3.1.3_bp1 + image: oceanbase/oceanbase-ce:3.1.4 container_name: observer environment: - 'OB_ROOT_PASSWORD=pswd' network_mode: "host" oblogproxy: - image: whhe/oblogproxy:1.0.2 + image: whhe/oblogproxy:1.0.3 container_name: oblogproxy environment: - 'OB_SYS_USERNAME=root' @@ -129,7 +129,7 @@ Flink SQL> CREATE TABLE orders ( customer_name STRING, price DECIMAL(10, 5), product_id INT, - order_status TINYINT, + order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', @@ -176,7 +176,7 @@ Flink SQL> CREATE TABLE enriched_orders ( customer_name STRING, price DECIMAL(10, 5), product_id INT, - order_status TINYINT, + order_status BOOLEAN, product_name STRING, product_description STRING, PRIMARY KEY (order_id) NOT ENFORCED diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java index 0df42d866..dcf28431a 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java @@ -23,20 +23,15 @@ import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.config.ObReaderConfig; import com.oceanbase.clogproxy.client.util.ClientIdGenerator; import com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema; import com.ververica.cdc.connectors.oceanbase.table.StartupMode; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import org.apache.commons.lang3.StringUtils; import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZoneOffset; import static org.apache.flink.util.Preconditions.checkNotNull; -/** - * A builder to build a SourceFunction which can read snapshot and continue to consume commit log. - */ +/** A builder to build a SourceFunction which can read snapshot and change events of OceanBase. */ @PublicEvolving public class OceanBaseSource { @@ -71,7 +66,7 @@ public class OceanBaseSource { private String configUrl; private String workingMode; - private DebeziumDeserializationSchema deserializer; + private OceanBaseDeserializationSchema deserializer; public Builder startupMode(StartupMode startupMode) { this.startupMode = startupMode; @@ -163,7 +158,7 @@ public class OceanBaseSource { return this; } - public Builder deserializer(DebeziumDeserializationSchema deserializer) { + public Builder deserializer(OceanBaseDeserializationSchema deserializer) { this.deserializer = deserializer; return this; } @@ -202,7 +197,6 @@ public class OceanBaseSource { if (serverTimeZone == null) { serverTimeZone = "+00:00"; } - ZoneOffset zoneOffset = ZoneId.of(serverTimeZone).getRules().getOffset(Instant.now()); if (connectTimeout == null) { connectTimeout = Duration.ofSeconds(30); @@ -245,7 +239,6 @@ public class OceanBaseSource { databaseName, tableName, tableList, - zoneOffset, connectTimeout, hostname, port, diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java new file mode 100644 index 000000000..54bdfb14e --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import com.oceanbase.oms.logmessage.ByteString; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + +/** + * Runtime converter that converts objects of OceanBase into objects of Flink Table & SQL internal + * data structures. + */ +public interface OceanBaseDeserializationRuntimeConverter extends Serializable { + + default Object convert(Object object) throws Exception { + if (object instanceof ByteString) { + return convertChangeEvent( + ((ByteString) object).toString(StandardCharsets.UTF_8.name())); + } else { + return convertSnapshotEvent(object); + } + } + + default Object convertSnapshotEvent(Object object) throws Exception { + throw new NotImplementedException(); + } + + default Object convertChangeEvent(String string) throws Exception { + throw new NotImplementedException(); + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java deleted file mode 100644 index 432419e0d..000000000 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2022 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oceanbase.source; - -import com.oceanbase.oms.logmessage.ByteString; -import com.oceanbase.oms.logmessage.DataMessage; -import io.debezium.config.CommonConnectorConfig; -import io.debezium.jdbc.JdbcValueConverters; -import io.debezium.jdbc.TemporalPrecisionMode; -import io.debezium.relational.ValueConverterProvider; -import io.debezium.util.NumberConversions; -import org.apache.kafka.connect.data.Schema; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.time.ZoneOffset; -import java.util.Arrays; - -/** Utils to convert jdbc type and value of a field. */ -public class OceanBaseJdbcConverter { - - public static ValueConverterProvider valueConverterProvider(ZoneOffset zoneOffset) { - return new JdbcValueConverters( - JdbcValueConverters.DecimalMode.STRING, - TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, - zoneOffset, - null, - JdbcValueConverters.BigIntUnsignedMode.PRECISE, - CommonConnectorConfig.BinaryHandlingMode.BYTES); - } - - public static Object getField(int jdbcType, Object value) { - if (value == null) { - return null; - } - jdbcType = getType(jdbcType, null); - switch (jdbcType) { - case Types.BIT: - if (value instanceof Boolean) { - return new byte[] {NumberConversions.getByte((Boolean) value)}; - } - return value; - case Types.INTEGER: - if (value instanceof Boolean) { - return NumberConversions.getInteger((Boolean) value); - } - if (value instanceof Date) { - return ((Date) value).getYear() + 1900; - } - return value; - case Types.FLOAT: - Float f = (Float) value; - return f.doubleValue(); - case Types.DECIMAL: - if (value instanceof BigInteger) { - return value.toString(); - } - BigDecimal decimal = (BigDecimal) value; - return decimal.toString(); - case Types.DATE: - Date date = (Date) value; - return io.debezium.time.Date.toEpochDay(date, null); - case Types.TIME: - Time time = (Time) value; - return io.debezium.time.MicroTime.toMicroOfDay(time, true); - case Types.TIMESTAMP: - Timestamp timestamp = (Timestamp) value; - return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null); - default: - return value; - } - } - - public static Object getField( - Schema.Type schemaType, DataMessage.Record.Field.Type fieldType, ByteString value) { - if (value == null) { - return null; - } - int jdbcType = getType(fieldType); - switch (jdbcType) { - case Types.NULL: - return null; - case Types.INTEGER: - if (schemaType.equals(Schema.Type.INT64)) { - return Long.parseLong(value.toString()); - } - return Integer.parseInt(value.toString()); - case Types.BIGINT: - if (schemaType.equals(Schema.Type.STRING)) { - return value.toString(); - } - return Long.parseLong(value.toString()); - case Types.DOUBLE: - return Double.parseDouble(value.toString()); - case Types.DATE: - Date date = Date.valueOf(value.toString()); - return io.debezium.time.Date.toEpochDay(date, null); - case Types.TIME: - Time time = Time.valueOf(value.toString()); - return io.debezium.time.MicroTime.toMicroOfDay(time, true); - case Types.TIMESTAMP: - Timestamp timestamp = Timestamp.valueOf(value.toString()); - return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null); - case Types.BIT: - long v = Long.parseLong(value.toString()); - byte[] bytes = ByteBuffer.allocate(8).putLong(v).array(); - int i = 0; - while (bytes[i] == 0 && i < Long.BYTES - 1) { - i++; - } - return Arrays.copyOfRange(bytes, i, Long.BYTES); - case Types.BINARY: - return ByteBuffer.wrap(value.toString().getBytes(StandardCharsets.UTF_8)); - default: - return value.toString(StandardCharsets.UTF_8.toString()); - } - } - - private static boolean isBoolean(int jdbcType, String typeName) { - return jdbcType == Types.BOOLEAN || (jdbcType == Types.BIT && "TINYINT".equals(typeName)); - } - - public static int getType(int jdbcType, String typeName) { - // treat boolean as tinyint type - if (isBoolean(jdbcType, typeName)) { - jdbcType = Types.TINYINT; - } - // treat year as int type - if ("YEAR".equals(typeName)) { - jdbcType = Types.INTEGER; - } - - // upcasting - if ("INT UNSIGNED".equals(typeName)) { - jdbcType = Types.BIGINT; - } - if ("BIGINT UNSIGNED".equals(typeName)) { - jdbcType = Types.DECIMAL; - } - - // widening conversion according to com.mysql.jdbc.ResultSetImpl#getObject - switch (jdbcType) { - case Types.TINYINT: - case Types.SMALLINT: - return Types.INTEGER; - case Types.REAL: - return Types.FLOAT; - default: - return jdbcType; - } - } - - public static int getType(DataMessage.Record.Field.Type fieldType) { - switch (fieldType) { - case NULL: - return Types.NULL; - case INT8: - case INT16: - case INT24: - case INT32: - case YEAR: - return Types.INTEGER; - case INT64: - return Types.BIGINT; - case FLOAT: - case DOUBLE: - return Types.DOUBLE; - case DECIMAL: - return Types.DECIMAL; - case ENUM: - case SET: - case STRING: - case JSON: - return Types.CHAR; - case TIMESTAMP: - case DATETIME: - case TIMESTAMP_WITH_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - case TIMESTAMP_NANO: - return Types.TIMESTAMP; - case DATE: - return Types.DATE; - case TIME: - return Types.TIME; - case BIT: - return Types.BIT; - case BLOB: - case BINARY: - return Types.BINARY; - case INTERVAL_YEAR_TO_MONTH: - case INTERVAL_DAY_TO_SECOND: - case GEOMETRY: - case RAW: - // it's weird to get wrong type from TEXT column, temporarily treat it as a string - case UNKOWN: - default: - return Types.VARCHAR; - } - } -} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java index d48ebb62b..39d4a09ee 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.FlinkRuntimeException; -import com.mysql.jdbc.ResultSetMetaData; import com.oceanbase.clogproxy.client.LogProxyClient; import com.oceanbase.clogproxy.client.config.ClientConf; import com.oceanbase.clogproxy.client.config.ObReaderConfig; @@ -38,26 +37,21 @@ import com.oceanbase.clogproxy.client.exception.LogProxyClientException; import com.oceanbase.clogproxy.client.listener.RecordListener; import com.oceanbase.oms.logmessage.DataMessage; import com.oceanbase.oms.logmessage.LogMessage; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import io.debezium.relational.TableSchema; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.time.Duration; -import java.time.ZoneOffset; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -86,7 +80,6 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction private final String databaseName; private final String tableName; private final String tableList; - private final ZoneOffset zoneOffset; private final Duration connectTimeout; private final String hostname; private final Integer port; @@ -94,13 +87,12 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction private final int logProxyPort; private final ClientConf logProxyClientConf; private final ObReaderConfig obReaderConfig; - private final DebeziumDeserializationSchema deserializer; + private final OceanBaseDeserializationSchema deserializer; private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false); private final List logMessageBuffer = new LinkedList<>(); private transient Set tableSet; - private transient Map tableSchemaMap; private transient volatile long resolvedTimestamp; private transient volatile OceanBaseConnection snapshotConnection; private transient LogProxyClient logProxyClient; @@ -115,7 +107,6 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction String databaseName, String tableName, String tableList, - ZoneOffset zoneOffset, Duration connectTimeout, String hostname, Integer port, @@ -123,7 +114,7 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction int logProxyPort, ClientConf logProxyClientConf, ObReaderConfig obReaderConfig, - DebeziumDeserializationSchema deserializer) { + OceanBaseDeserializationSchema deserializer) { this.snapshot = checkNotNull(snapshot); this.username = checkNotNull(username); this.password = checkNotNull(password); @@ -131,7 +122,6 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction this.databaseName = databaseName; this.tableName = tableName; this.tableList = tableList; - this.zoneOffset = checkNotNull(zoneOffset); this.connectTimeout = checkNotNull(connectTimeout); this.hostname = hostname; this.port = port; @@ -146,7 +136,6 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction public void open(final Configuration config) throws Exception { super.open(config); this.outputCollector = new OutputCollector<>(); - this.tableSchemaMap = new ConcurrentHashMap<>(); this.resolvedTimestamp = -1; } @@ -157,25 +146,29 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction LOG.info("Start to initial table whitelist"); initTableWhiteList(); - LOG.info("Start readChangeEvents process"); - readChangeEvents(); + LOG.info("Start readChangeRecords process"); + readChangeRecords(); if (shouldReadSnapshot()) { synchronized (ctx.getCheckpointLock()) { try { - readSnapshot(); + readSnapshotRecords(); } finally { closeSnapshotConnection(); } LOG.info("Snapshot reading finished"); } } else { - LOG.info("Skip snapshot read"); + LOG.info("Skip snapshot reading"); } logProxyClient.join(); } + private boolean shouldReadSnapshot() { + return resolvedTimestamp == -1 && snapshot; + } + private OceanBaseConnection getSnapshotConnection() { if (snapshotConnection == null) { snapshotConnection = @@ -253,20 +246,19 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction .collect(Collectors.joining("|"))); } - protected void readSnapshot() { + protected void readSnapshotRecords() { tableSet.forEach( table -> { String[] schema = table.split("\\."); - readSnapshotFromTable(schema[0], schema[1]); + readSnapshotRecordsByTable(schema[0], schema[1]); }); snapshotCompleted.set(true); } - private void readSnapshotFromTable(String databaseName, String tableName) { - String topicName = getDefaultTopicName(tenantName, databaseName, tableName); - Map partition = getSourcePartition(tenantName, databaseName, tableName); - // the offset here is useless - Map offset = getSourceOffset(resolvedTimestamp); + private void readSnapshotRecordsByTable(String databaseName, String tableName) { + OceanBaseRecord.SourceInfo sourceInfo = + new OceanBaseRecord.SourceInfo( + tenantName, databaseName, tableName, resolvedTimestamp); String fullName = String.format("`%s`.`%s`", databaseName, tableName); String selectSql = "SELECT * FROM " + fullName; @@ -276,58 +268,17 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction .query( selectSql, rs -> { - ResultSetMetaData metaData = (ResultSetMetaData) rs.getMetaData(); - String[] columnNames = new String[metaData.getColumnCount()]; - int[] jdbcTypes = new int[metaData.getColumnCount()]; - for (int i = 0; i < metaData.getColumnCount(); i++) { - columnNames[i] = metaData.getColumnName(i + 1); - jdbcTypes[i] = - OceanBaseJdbcConverter.getType( - metaData.getColumnType(i + 1), - metaData.getColumnTypeName(i + 1)); - } - - TableSchema tableSchema = tableSchemaMap.get(topicName); - if (tableSchema == null) { - tableSchema = - OceanBaseTableSchema.getTableSchema( - topicName, - databaseName, - tableName, - columnNames, - jdbcTypes, - zoneOffset); - tableSchemaMap.put(topicName, tableSchema); - } - - Struct source = - OceanBaseSchemaUtils.sourceStruct( - tenantName, databaseName, tableName, null, null); - + ResultSetMetaData metaData = rs.getMetaData(); while (rs.next()) { - Struct value = new Struct(tableSchema.valueSchema()); + Map fieldMap = new HashMap<>(); for (int i = 0; i < metaData.getColumnCount(); i++) { - value.put( - columnNames[i], - OceanBaseJdbcConverter.getField( - jdbcTypes[i], rs.getObject(i + 1))); + fieldMap.put( + metaData.getColumnName(i + 1), rs.getObject(i + 1)); } - Struct struct = - tableSchema - .getEnvelopeSchema() - .create(value, source, null); + OceanBaseRecord record = + new OceanBaseRecord(sourceInfo, fieldMap); try { - deserializer.deserialize( - new SourceRecord( - partition, - offset, - topicName, - null, - null, - null, - struct.schema(), - struct), - outputCollector); + deserializer.deserialize(record, outputCollector); } catch (Exception e) { LOG.error("Deserialize snapshot record failed ", e); throw new FlinkRuntimeException(e); @@ -341,7 +292,7 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction } } - protected void readChangeEvents() throws InterruptedException, TimeoutException { + protected void readChangeRecords() throws InterruptedException, TimeoutException { if (resolvedTimestamp > 0) { obReaderConfig.updateCheckpoint(Long.toString(resolvedTimestamp)); LOG.info("Read change events from resolvedTimestamp: {}", resolvedTimestamp); @@ -382,8 +333,7 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction msg -> { try { deserializer.deserialize( - getRecordFromLogMessage(msg), - outputCollector); + getChangeRecord(msg), outputCollector); } catch (Exception e) { throw new FlinkRuntimeException(e); } @@ -422,147 +372,15 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction LOG.info("LogProxyClient packet processing started"); } - private SourceRecord getRecordFromLogMessage(LogMessage message) { - String databaseName = getDbName(message.getDbName()); - String topicName = getDefaultTopicName(tenantName, databaseName, message.getTableName()); - - if (tableSchemaMap.get(topicName) == null) { - String[] columnNames = new String[message.getFieldCount()]; - int[] jdbcTypes = new int[message.getFieldCount()]; - int i = 0; - for (DataMessage.Record.Field field : message.getFieldList()) { - if (message.getOpt() == DataMessage.Record.Type.UPDATE && field.isPrev()) { - continue; - } - columnNames[i] = field.getFieldname(); - jdbcTypes[i] = OceanBaseJdbcConverter.getType(field.getType()); - i++; - } - TableSchema tableSchema = - OceanBaseTableSchema.getTableSchema( - topicName, - databaseName, - message.getTableName(), - columnNames, - jdbcTypes, - zoneOffset); - tableSchemaMap.put(topicName, tableSchema); - } - - Struct source = - OceanBaseSchemaUtils.sourceStruct( + private OceanBaseRecord getChangeRecord(LogMessage message) { + String databaseName = message.getDbName().replace(tenantName + ".", ""); + OceanBaseRecord.SourceInfo sourceInfo = + new OceanBaseRecord.SourceInfo( tenantName, databaseName, message.getTableName(), - String.valueOf(getCheckpointTimestamp(message)), - message.getOB10UniqueId()); - Struct struct; - switch (message.getOpt()) { - case INSERT: - Struct after = getLogValueStruct(topicName, message.getFieldList()); - struct = - tableSchemaMap - .get(topicName) - .getEnvelopeSchema() - .create(after, source, null); - break; - case UPDATE: - List beforeFields = new ArrayList<>(); - List afterFields = new ArrayList<>(); - for (DataMessage.Record.Field field : message.getFieldList()) { - if (field.isPrev()) { - beforeFields.add(field); - } else { - afterFields.add(field); - } - } - after = getLogValueStruct(topicName, afterFields); - Struct before = getLogValueStruct(topicName, beforeFields); - struct = - tableSchemaMap - .get(topicName) - .getEnvelopeSchema() - .update(before, after, source, null); - break; - case DELETE: - before = getLogValueStruct(topicName, message.getFieldList()); - struct = - tableSchemaMap - .get(topicName) - .getEnvelopeSchema() - .delete(before, source, null); - break; - default: - throw new UnsupportedOperationException( - "Unsupported dml type: " + message.getOpt()); - } - return new SourceRecord( - getSourcePartition(tenantName, databaseName, message.getTableName()), - getSourceOffset(getCheckpointTimestamp(message)), - topicName, - null, - null, - null, - struct.schema(), - struct); - } - - private boolean shouldReadSnapshot() { - return resolvedTimestamp == -1 && snapshot; - } - - private String getDbName(String origin) { - if (origin == null) { - return null; - } - return origin.replace(tenantName + ".", ""); - } - - private String getDefaultTopicName(String tenantName, String databaseName, String tableName) { - return String.format("%s.%s.%s", tenantName, databaseName, tableName); - } - - private Map getSourcePartition( - String tenantName, String databaseName, String tableName) { - Map sourcePartition = new HashMap<>(); - sourcePartition.put("tenant", tenantName); - sourcePartition.put("database", databaseName); - sourcePartition.put("table", tableName); - return sourcePartition; - } - - private Map getSourceOffset(long timestamp) { - Map sourceOffset = new HashMap<>(); - sourceOffset.put("timestamp", timestamp); - return sourceOffset; - } - - private Struct getLogValueStruct(String topicName, List fieldList) { - TableSchema tableSchema = tableSchemaMap.get(topicName); - Struct value = new Struct(tableSchema.valueSchema()); - Object fieldValue; - for (DataMessage.Record.Field field : fieldList) { - try { - Schema fieldSchema = tableSchema.valueSchema().field(field.getFieldname()).schema(); - fieldValue = - OceanBaseJdbcConverter.getField( - fieldSchema.type(), field.getType(), field.getValue()); - value.put(field.getFieldname(), fieldValue); - } catch (NumberFormatException e) { - tableSchema = - OceanBaseTableSchema.upcastingTableSchema( - topicName, - tableSchema, - fieldList.stream() - .collect( - Collectors.toMap( - DataMessage.Record.Field::getFieldname, - f -> f.getValue().toString()))); - tableSchemaMap.put(topicName, tableSchema); - return getLogValueStruct(topicName, fieldList); - } - } - return value; + getCheckpointTimestamp(message)); + return new OceanBaseRecord(sourceInfo, message.getOpt(), message.getFieldList()); } /** diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java deleted file mode 100644 index d9230b4d5..000000000 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2022 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oceanbase.source; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; - -/** Utils to deal with OceanBase SourceRecord schema. */ -public class OceanBaseSchemaUtils { - - public static Schema sourceSchema() { - return SchemaBuilder.struct() - .field("tenant", Schema.STRING_SCHEMA) - .field("database", Schema.STRING_SCHEMA) - .field("table", Schema.STRING_SCHEMA) - .field("timestamp", Schema.OPTIONAL_STRING_SCHEMA) - .field("unique_id", Schema.OPTIONAL_STRING_SCHEMA) - .build(); - } - - public static Struct sourceStruct( - String tenant, String database, String table, String timestamp, String uniqueId) { - Struct struct = - new Struct(sourceSchema()) - .put("tenant", tenant) - .put("database", database) - .put("table", table); - if (timestamp != null) { - struct.put("timestamp", timestamp); - } - if (uniqueId != null) { - struct.put("unique_id", uniqueId); - } - return struct; - } -} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java deleted file mode 100644 index ce0f099e6..000000000 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2022 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oceanbase.source; - -import io.debezium.data.Envelope; -import io.debezium.relational.Column; -import io.debezium.relational.ColumnEditor; -import io.debezium.relational.CustomConverterRegistry; -import io.debezium.relational.Table; -import io.debezium.relational.TableEditor; -import io.debezium.relational.TableId; -import io.debezium.relational.TableSchema; -import io.debezium.relational.TableSchemaBuilder; -import io.debezium.util.SchemaNameAdjuster; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; - -import java.math.BigInteger; -import java.sql.Types; -import java.time.ZoneOffset; -import java.util.Map; - -/** Utils to deal with table schema of OceanBase. */ -public class OceanBaseTableSchema { - - public static TableSchemaBuilder tableSchemaBuilder(ZoneOffset zoneOffset) { - return new TableSchemaBuilder( - OceanBaseJdbcConverter.valueConverterProvider(zoneOffset), - SchemaNameAdjuster.create(), - new CustomConverterRegistry(null), - OceanBaseSchemaUtils.sourceSchema(), - false); - } - - public static TableId tableId(String databaseName, String tableName) { - return new TableId(databaseName, null, tableName); - } - - public static Column getColumn(String name, int jdbcType) { - // we can't get the scale and length of decimal, timestamp and bit columns from log, - // so here we set a constant value to these fields to be compatible with the logic of - // JdbcValueConverters#schemaBuilder - ColumnEditor columnEditor = - Column.editor().name(name).jdbcType(jdbcType).optional(true).scale(0); - if (columnEditor.jdbcType() == Types.TIMESTAMP || columnEditor.jdbcType() == Types.BIT) { - columnEditor.length(6); - } - return columnEditor.create(); - } - - public static TableSchema getTableSchema( - String topicName, - String databaseName, - String tableName, - String[] columnNames, - int[] jdbcTypes, - ZoneOffset zoneOffset) { - TableEditor tableEditor = Table.editor().tableId(tableId(databaseName, tableName)); - for (int i = 0; i < columnNames.length; i++) { - tableEditor.addColumn(getColumn(columnNames[i], jdbcTypes[i])); - } - return tableSchemaBuilder(zoneOffset) - .create( - null, - Envelope.schemaName(topicName), - tableEditor.create(), - null, - null, - null); - } - - public static Schema upcastingSchemaType(Schema schema, String value) { - if (schema.type().equals(Schema.Type.INT32) && Long.parseLong(value) > Integer.MAX_VALUE) { - return Schema.INT64_SCHEMA; - } - if (schema.type().equals(Schema.Type.INT64)) { - BigInteger bigInt = new BigInteger(value); - if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { - return Schema.STRING_SCHEMA; - } - } - return schema; - } - - public static Schema upcastingValueSchema(Schema valueSchema, Map fields) { - SchemaBuilder schemaBuilder = SchemaBuilder.struct().optional(); - for (Map.Entry entry : fields.entrySet()) { - Schema fieldSchema = valueSchema.field(entry.getKey()).schema(); - fieldSchema = upcastingSchemaType(fieldSchema, entry.getValue()); - schemaBuilder.field(entry.getKey(), fieldSchema); - } - return schemaBuilder.build(); - } - - public static Envelope getEnvelope(String name, Schema valueSchema) { - return Envelope.defineSchema() - .withName(name) - .withRecord(valueSchema) - .withSource(OceanBaseSchemaUtils.sourceSchema()) - .build(); - } - - public static TableSchema upcastingTableSchema( - String topicName, TableSchema tableSchema, Map fields) { - Schema valueSchema = upcastingValueSchema(tableSchema.valueSchema(), fields); - return new TableSchema( - tableSchema.id(), - null, - null, - getEnvelope(Envelope.schemaName(topicName), valueSchema), - valueSchema, - null); - } -} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/RowDataOceanBaseDeserializationSchema.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/RowDataOceanBaseDeserializationSchema.java new file mode 100644 index 000000000..a0c4b0b0e --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/RowDataOceanBaseDeserializationSchema.java @@ -0,0 +1,679 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import com.oceanbase.oms.logmessage.ByteString; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseAppendMetadataCollector; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseMetadataConverter; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord; +import com.ververica.cdc.debezium.utils.TemporalConversions; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from OceanBase object to Flink Table/SQL internal data structure {@link + * RowData}. + */ +public class RowDataOceanBaseDeserializationSchema + implements OceanBaseDeserializationSchema { + + private static final long serialVersionUID = 1L; + + /** TypeInformation of the produced {@link RowData}. * */ + private final TypeInformation resultTypeInfo; + + /** + * Runtime converter that OceanBase record data into {@link RowData} consisted of physical + * column values. + */ + private final OceanBaseDeserializationRuntimeConverter physicalConverter; + + /** Whether the deserializer needs to handle metadata columns. */ + private final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. + */ + private final OceanBaseAppendMetadataCollector appendMetadataCollector; + + /** Returns a builder to build {@link RowDataOceanBaseDeserializationSchema}. */ + public static RowDataOceanBaseDeserializationSchema.Builder newBuilder() { + return new RowDataOceanBaseDeserializationSchema.Builder(); + } + + RowDataOceanBaseDeserializationSchema( + RowType physicalDataType, + OceanBaseMetadataConverter[] metadataConverters, + TypeInformation resultTypeInfo, + ZoneId serverTimeZone) { + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new OceanBaseAppendMetadataCollector(metadataConverters); + this.physicalConverter = createConverter(checkNotNull(physicalDataType), serverTimeZone); + this.resultTypeInfo = checkNotNull(resultTypeInfo); + } + + @Override + public void deserialize(OceanBaseRecord record, Collector out) throws Exception { + RowData physicalRow; + if (record.isSnapshotRecord()) { + physicalRow = (GenericRowData) physicalConverter.convert(record.getJdbcFields()); + physicalRow.setRowKind(RowKind.INSERT); + emit(record, physicalRow, out); + } else { + switch (record.getOpt()) { + case INSERT: + physicalRow = + (GenericRowData) + physicalConverter.convert(record.getLogMessageFieldsAfter()); + physicalRow.setRowKind(RowKind.INSERT); + emit(record, physicalRow, out); + break; + case DELETE: + physicalRow = + (GenericRowData) + physicalConverter.convert(record.getLogMessageFieldsBefore()); + physicalRow.setRowKind(RowKind.DELETE); + emit(record, physicalRow, out); + break; + case UPDATE: + physicalRow = + (GenericRowData) + physicalConverter.convert(record.getLogMessageFieldsBefore()); + physicalRow.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, physicalRow, out); + physicalRow = + (GenericRowData) + physicalConverter.convert(record.getLogMessageFieldsAfter()); + physicalRow.setRowKind(RowKind.UPDATE_AFTER); + emit(record, physicalRow, out); + break; + default: + throw new IllegalArgumentException( + "Unsupported log message record type: " + record.getOpt()); + } + } + } + + private void emit(OceanBaseRecord row, RowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + + appendMetadataCollector.inputRecord = row; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); + } + + @Override + public TypeInformation getProducedType() { + return resultTypeInfo; + } + + /** Builder class of {@link RowDataOceanBaseDeserializationSchema}. */ + public static class Builder { + private RowType physicalRowType; + private TypeInformation resultTypeInfo; + private OceanBaseMetadataConverter[] metadataConverters = new OceanBaseMetadataConverter[0]; + private ZoneId serverTimeZone = ZoneId.of("UTC"); + + public RowDataOceanBaseDeserializationSchema.Builder setPhysicalRowType( + RowType physicalRowType) { + this.physicalRowType = physicalRowType; + return this; + } + + public RowDataOceanBaseDeserializationSchema.Builder setMetadataConverters( + OceanBaseMetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + return this; + } + + public RowDataOceanBaseDeserializationSchema.Builder setResultTypeInfo( + TypeInformation resultTypeInfo) { + this.resultTypeInfo = resultTypeInfo; + return this; + } + + public RowDataOceanBaseDeserializationSchema.Builder setServerTimeZone( + ZoneId serverTimeZone) { + this.serverTimeZone = serverTimeZone; + return this; + } + + public RowDataOceanBaseDeserializationSchema build() { + return new RowDataOceanBaseDeserializationSchema( + physicalRowType, metadataConverters, resultTypeInfo, serverTimeZone); + } + } + + private static OceanBaseDeserializationRuntimeConverter createConverter( + LogicalType type, ZoneId serverTimeZone) { + return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone)); + } + + private static OceanBaseDeserializationRuntimeConverter wrapIntoNullableConverter( + OceanBaseDeserializationRuntimeConverter converter) { + return new OceanBaseDeserializationRuntimeConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object object) throws Exception { + if (object == null) { + return null; + } + return converter.convert(object); + } + }; + } + + public static OceanBaseDeserializationRuntimeConverter createNotNullConverter( + LogicalType type, ZoneId serverTimeZone) { + switch (type.getTypeRoot()) { + case ROW: + return createRowConverter((RowType) type, serverTimeZone); + case NULL: + return convertToNull(); + case BOOLEAN: + return convertToBoolean(); + case TINYINT: + return convertToTinyInt(); + case SMALLINT: + return convertToSmallInt(); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return convertToInt(); + case BIGINT: + case INTERVAL_DAY_TIME: + return convertToLong(); + case DATE: + return convertToDate(); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToLocalTimeZoneTimestamp(serverTimeZone); + case FLOAT: + return convertToFloat(); + case DOUBLE: + return convertToDouble(); + case CHAR: + case VARCHAR: + return convertToString(); + case BINARY: + return convertToBinary(); + case VARBINARY: + return convertToBytes(); + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter(); + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static OceanBaseDeserializationRuntimeConverter createRowConverter( + RowType rowType, ZoneId serverTimeZone) { + final OceanBaseDeserializationRuntimeConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(logicType -> createConverter(logicType, serverTimeZone)) + .toArray(OceanBaseDeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object object) throws Exception { + int arity = fieldNames.length; + GenericRowData row = new GenericRowData(arity); + Map fieldMap = (Map) object; + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + row.setField(i, fieldConverters[i].convert(fieldMap.get(fieldName))); + } + return row; + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToNull() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object object) { + return null; + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToBoolean() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof byte[]) { + return "1".equals(new String((byte[]) object, StandardCharsets.UTF_8)); + } + return Boolean.parseBoolean(object.toString()) || "1".equals(object.toString()); + } + + @Override + public Object convertChangeEvent(String string) { + return "1".equals(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToTinyInt() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + return Byte.parseByte(object.toString()); + } + + @Override + public Object convertChangeEvent(String string) { + return Byte.parseByte(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToSmallInt() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + return Short.parseShort(object.toString()); + } + + @Override + public Object convertChangeEvent(String string) { + return Short.parseShort(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToInt() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Integer) { + return object; + } else if (object instanceof Long) { + return ((Long) object).intValue(); + } else if (object instanceof Date) { + return ((Date) object).toLocalDate().getYear(); + } else { + return Integer.parseInt(object.toString()); + } + } + + @Override + public Object convertChangeEvent(String string) { + return Integer.parseInt(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToLong() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Integer) { + return ((Integer) object).longValue(); + } else if (object instanceof Long) { + return object; + } else { + return Long.parseLong(object.toString()); + } + } + + @Override + public Object convertChangeEvent(String string) { + return Long.parseLong(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToDouble() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Float) { + return ((Float) object).doubleValue(); + } else if (object instanceof Double) { + return object; + } else { + return Double.parseDouble(object.toString()); + } + } + + @Override + public Object convertChangeEvent(String string) { + return Double.parseDouble(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToFloat() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Float) { + return object; + } else if (object instanceof Double) { + return ((Double) object).floatValue(); + } else { + return Float.parseFloat(object.toString()); + } + } + + @Override + public Object convertChangeEvent(String string) { + return Float.parseFloat(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToDate() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + return (int) TemporalConversions.toLocalDate(object).toEpochDay(); + } + + @Override + public Object convertChangeEvent(String string) { + return (int) Date.valueOf(string).toLocalDate().toEpochDay(); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToTime() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Long) { + return (int) ((Long) object / 1000_000); + } + return TemporalConversions.toLocalTime(object).toSecondOfDay() * 1000; + } + + @Override + public Object convertChangeEvent(String string) { + return TemporalConversions.toLocalTime(Time.valueOf(string)).toSecondOfDay() * 1000; + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToTimestamp() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Timestamp) { + return TimestampData.fromLocalDateTime(((Timestamp) object).toLocalDateTime()); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + object + + "' of type " + + object.getClass().getName()); + } + + @Override + public Object convertChangeEvent(String string) { + return TimestampData.fromLocalDateTime(Timestamp.valueOf(string).toLocalDateTime()); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( + ZoneId serverTimeZone) { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof Timestamp) { + return TimestampData.fromInstant( + ((Timestamp) object) + .toLocalDateTime() + .atZone(serverTimeZone) + .toInstant()); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + object + + "' of type " + + object.getClass().getName()); + } + + @Override + public Object convertChangeEvent(String string) { + return TimestampData.fromInstant( + Timestamp.valueOf(string) + .toLocalDateTime() + .atZone(serverTimeZone) + .toInstant()); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToString() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + return StringData.fromString(object.toString()); + } + + @Override + public Object convertChangeEvent(String string) { + return StringData.fromString(string); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToBinary() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof byte[]) { + String str = new String((byte[]) object, StandardCharsets.US_ASCII); + return str.getBytes(StandardCharsets.UTF_8); + } else if (object instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) object; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BINARY value type: " + object.getClass().getSimpleName()); + } + } + + @Override + public Object convertChangeEvent(String string) { + try { + long v = Long.parseLong(string); + byte[] bytes = ByteBuffer.allocate(8).putLong(v).array(); + int i = 0; + while (i < Long.BYTES - 1 && bytes[i] == 0) { + i++; + } + return Arrays.copyOfRange(bytes, i, Long.BYTES); + } catch (NumberFormatException e) { + return string.getBytes(StandardCharsets.UTF_8); + } + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter convertToBytes() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + if (object instanceof byte[]) { + return object; + } else if (object instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) object; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + object.getClass().getSimpleName()); + } + } + + @Override + public Object convertChangeEvent(String string) { + return string.getBytes(StandardCharsets.UTF_8); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter createDecimalConverter( + DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convertSnapshotEvent(Object object) { + BigDecimal bigDecimal; + if (object instanceof String) { + bigDecimal = new BigDecimal((String) object); + } else if (object instanceof Long) { + bigDecimal = new BigDecimal((Long) object); + } else if (object instanceof BigInteger) { + bigDecimal = new BigDecimal((BigInteger) object); + } else if (object instanceof Double) { + bigDecimal = BigDecimal.valueOf((Double) object); + } else if (object instanceof BigDecimal) { + bigDecimal = (BigDecimal) object; + } else { + throw new IllegalArgumentException( + "Unable to convert to decimal from unexpected value '" + + object + + "' of type " + + object.getClass()); + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + } + + @Override + public Object convertChangeEvent(String string) { + return DecimalData.fromBigDecimal(new BigDecimal(string), precision, scale); + } + }; + } + + private static OceanBaseDeserializationRuntimeConverter createArrayConverter() { + return new OceanBaseDeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object object) { + String s; + if (object instanceof ByteString) { + s = ((ByteString) object).toString(StandardCharsets.UTF_8.name()); + } else { + s = object.toString(); + } + String[] strArray = s.split(","); + StringData[] stringDataArray = new StringData[strArray.length]; + for (int i = 0; i < strArray.length; i++) { + stringDataArray[i] = StringData.fromString(strArray[i]); + } + return new GenericArrayData(stringDataArray); + } + }; + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseAppendMetadataCollector.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseAppendMetadataCollector.java new file mode 100644 index 000000000..cd695b499 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseAppendMetadataCollector.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** Emits a row with physical fields and metadata fields. */ +@Internal +public class OceanBaseAppendMetadataCollector implements Collector, Serializable { + private static final long serialVersionUID = 1L; + + private final OceanBaseMetadataConverter[] metadataConverters; + + public transient OceanBaseRecord inputRecord; + public transient Collector outputCollector; + + public OceanBaseAppendMetadataCollector(OceanBaseMetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + } + + @Override + public void collect(RowData physicalRow) { + GenericRowData metaRow = new GenericRowData(metadataConverters.length); + for (int i = 0; i < metadataConverters.length; i++) { + Object meta = metadataConverters[i].read(inputRecord); + metaRow.setField(i, meta); + } + RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow); + outputCollector.collect(outRow); + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseDeserializationSchema.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseDeserializationSchema.java new file mode 100644 index 000000000..336c8f520 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseDeserializationSchema.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the OceanBase record into data types (Java/Scala + * objects) that are processed by Flink. + * + * @param The type created by the deserialization schema. + */ +@PublicEvolving +public interface OceanBaseDeserializationSchema extends Serializable, ResultTypeQueryable { + + /** Deserialize the OceanBase record, it is represented in {@link OceanBaseRecord}. */ + void deserialize(OceanBaseRecord record, Collector out) throws Exception; +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseMetadataConverter.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseMetadataConverter.java new file mode 100644 index 000000000..f1e984862 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseMetadataConverter.java @@ -0,0 +1,28 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** A converter converts OceanBase record metadata into Flink internal data structures. */ +@FunctionalInterface +@Internal +public interface OceanBaseMetadataConverter extends Serializable { + Object read(OceanBaseRecord record); +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java index dddc2f58a..3139f7ede 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java @@ -21,11 +21,6 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; -import com.ververica.cdc.debezium.table.MetadataConverter; -import io.debezium.data.Envelope; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceRecord; - /** Defines the supported metadata columns for {@link OceanBaseTableSource}. */ public enum OceanBaseReadableMetadata { @@ -33,14 +28,12 @@ public enum OceanBaseReadableMetadata { TENANT( "tenant_name", DataTypes.STRING().notNull(), - new MetadataConverter() { + new OceanBaseMetadataConverter() { private static final long serialVersionUID = 1L; @Override - public Object read(SourceRecord record) { - Struct value = (Struct) record.value(); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - return StringData.fromString(source.getString("tenant")); + public Object read(OceanBaseRecord record) { + return StringData.fromString(record.getSourceInfo().getTenant()); } }), @@ -48,14 +41,12 @@ public enum OceanBaseReadableMetadata { DATABASE( "database_name", DataTypes.STRING().notNull(), - new MetadataConverter() { + new OceanBaseMetadataConverter() { private static final long serialVersionUID = 1L; @Override - public Object read(SourceRecord record) { - Struct value = (Struct) record.value(); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - return StringData.fromString(source.getString("database")); + public Object read(OceanBaseRecord record) { + return StringData.fromString(record.getSourceInfo().getDatabase()); } }), @@ -63,14 +54,12 @@ public enum OceanBaseReadableMetadata { TABLE( "table_name", DataTypes.STRING().notNull(), - new MetadataConverter() { + new OceanBaseMetadataConverter() { private static final long serialVersionUID = 1L; @Override - public Object read(SourceRecord record) { - Struct value = (Struct) record.value(); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - return StringData.fromString(source.getString("table")); + public Object read(OceanBaseRecord record) { + return StringData.fromString(record.getSourceInfo().getTable()); } }), @@ -81,18 +70,13 @@ public enum OceanBaseReadableMetadata { OP_TS( "op_ts", DataTypes.TIMESTAMP_LTZ(3).notNull(), - new MetadataConverter() { + new OceanBaseMetadataConverter() { private static final long serialVersionUID = 1L; @Override - public Object read(SourceRecord record) { - Struct value = (Struct) record.value(); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - String timestamp = source.getString("timestamp"); - if (timestamp == null) { - timestamp = "0"; - } - return TimestampData.fromEpochMillis(Long.parseLong(timestamp) * 1000); + public Object read(OceanBaseRecord record) { + return TimestampData.fromEpochMillis( + record.getSourceInfo().getTimestampS() * 1000); } }); @@ -100,9 +84,9 @@ public enum OceanBaseReadableMetadata { private final DataType dataType; - private final MetadataConverter converter; + private final OceanBaseMetadataConverter converter; - OceanBaseReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + OceanBaseReadableMetadata(String key, DataType dataType, OceanBaseMetadataConverter converter) { this.key = key; this.dataType = dataType; this.converter = converter; @@ -116,7 +100,7 @@ public enum OceanBaseReadableMetadata { return dataType; } - public MetadataConverter getConverter() { + public OceanBaseMetadataConverter getConverter() { return converter; } } diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseRecord.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseRecord.java new file mode 100644 index 000000000..207fd58a2 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseRecord.java @@ -0,0 +1,118 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import com.oceanbase.oms.logmessage.ByteString; +import com.oceanbase.oms.logmessage.DataMessage; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** An internal data structure representing record of OceanBase. */ +public class OceanBaseRecord implements Serializable { + private static final long serialVersionUID = 1L; + + private final SourceInfo sourceInfo; + private final boolean isSnapshotRecord; + private Map jdbcFields; + private DataMessage.Record.Type opt; + private Map logMessageFieldsBefore; + private Map logMessageFieldsAfter; + + public OceanBaseRecord(SourceInfo sourceInfo, Map jdbcFields) { + this.sourceInfo = sourceInfo; + this.isSnapshotRecord = true; + this.jdbcFields = jdbcFields; + } + + public OceanBaseRecord( + SourceInfo sourceInfo, + DataMessage.Record.Type opt, + List logMessageFieldList) { + this.sourceInfo = sourceInfo; + this.isSnapshotRecord = false; + this.opt = opt; + this.logMessageFieldsBefore = new HashMap<>(); + this.logMessageFieldsAfter = new HashMap<>(); + for (DataMessage.Record.Field field : logMessageFieldList) { + if (field.isPrev()) { + logMessageFieldsBefore.put(field.getFieldname(), field.getValue()); + } else { + logMessageFieldsAfter.put(field.getFieldname(), field.getValue()); + } + } + } + + public SourceInfo getSourceInfo() { + return sourceInfo; + } + + public boolean isSnapshotRecord() { + return isSnapshotRecord; + } + + public Map getJdbcFields() { + return jdbcFields; + } + + public DataMessage.Record.Type getOpt() { + return opt; + } + + public Map getLogMessageFieldsBefore() { + return logMessageFieldsBefore; + } + + public Map getLogMessageFieldsAfter() { + return logMessageFieldsAfter; + } + + /** Information about the source of record. */ + public static class SourceInfo implements Serializable { + private static final long serialVersionUID = 1L; + + private final String tenant; + private final String database; + private final String table; + private final long timestampS; + + public SourceInfo(String tenant, String database, String table, long timestampS) { + this.tenant = tenant; + this.database = database; + this.table = table; + this.timestampS = timestampS; + } + + public String getTenant() { + return tenant; + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public long getTimestampS() { + return timestampS; + } + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java index f021276eb..32beb046b 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java @@ -29,9 +29,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.MetadataConverter; -import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema; import java.time.Duration; import java.time.ZoneId; @@ -138,11 +136,11 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { RowType physicalDataType = (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); - MetadataConverter[] metadataConverters = getMetadataConverters(); + OceanBaseMetadataConverter[] metadataConverters = getMetadataConverters(); TypeInformation resultTypeInfo = context.createTypeInformation(producedDataType); - DebeziumDeserializationSchema deserializer = - RowDataDebeziumDeserializeSchema.newBuilder() + RowDataOceanBaseDeserializationSchema deserializer = + RowDataOceanBaseDeserializationSchema.newBuilder() .setPhysicalRowType(physicalDataType) .setMetadataConverters(metadataConverters) .setResultTypeInfo(resultTypeInfo) @@ -173,9 +171,9 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet return SourceFunctionProvider.of(builder.build(), false); } - protected MetadataConverter[] getMetadataConverters() { + protected OceanBaseMetadataConverter[] getMetadataConverters() { if (metadataKeys.isEmpty()) { - return new MetadataConverter[0]; + return new OceanBaseMetadataConverter[0]; } return metadataKeys.stream() .map( @@ -185,7 +183,7 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet .findFirst() .orElseThrow(IllegalStateException::new)) .map(OceanBaseReadableMetadata::getConverter) - .toArray(MetadataConverter[]::new); + .toArray(OceanBaseMetadataConverter[]::new); } @Override diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java index c82c3d058..146e27013 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java @@ -32,7 +32,6 @@ import java.util.Set; public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory { private static final String IDENTIFIER = "oceanbase-cdc"; - private static final String OB_CDC_PREFIX = "obcdc."; public static final ConfigOption SCAN_STARTUP_MODE = ConfigOptions.key("scan.startup.mode") diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java index 9d0cc7598..732dac0f7 100644 --- a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java @@ -108,7 +108,7 @@ public class OceanBaseTestBase extends TestLogger { @ClassRule public static final GenericContainer OB_SERVER = - new GenericContainer<>("oceanbase/oceanbase-ce:3.1.3_bp1") + new GenericContainer<>("oceanbase/oceanbase-ce:3.1.4") .withNetworkMode(NETWORK_MODE) .withExposedPorts(OB_SERVER_SQL_PORT, OB_SERVER_RPC_PORT) .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD) @@ -118,7 +118,7 @@ public class OceanBaseTestBase extends TestLogger { @ClassRule public static final GenericContainer LOG_PROXY = - new GenericContainer<>("whhe/oblogproxy:1.0.2") + new GenericContainer<>("whhe/oblogproxy:1.0.3") .withNetworkMode(NETWORK_MODE) .withExposedPorts(LOG_PROXY_PORT) .withEnv("OB_SYS_USERNAME", OB_SYS_USERNAME) diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java index 69f2e19c7..aac661517 100644 --- a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java @@ -296,13 +296,16 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase { String.format( "CREATE TABLE ob_source (\n" + " `id` INT NOT NULL,\n" - + " bool_c TINYINT,\n" + + " bit1_c BOOLEAN,\n" + + " tiny1_c BOOLEAN,\n" + + " boolean_c BOOLEAN,\n" + " tiny_c TINYINT,\n" + " tiny_un_c SMALLINT,\n" + " small_c SMALLINT ,\n" + " small_un_c INT ,\n" + " medium_c INT,\n" + " medium_un_c INT,\n" + + " int11_c INT,\n" + " int_c INT,\n" + " int_un_c BIGINT,\n" + " big_c BIGINT,\n" @@ -317,18 +320,22 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase { + " time_c TIME(0),\n" + " datetime3_c TIMESTAMP(3),\n" + " datetime6_c TIMESTAMP(6),\n" - + " timestamp_c TIMESTAMP,\n" + + " timestamp_c TIMESTAMP_LTZ,\n" + + " timestamp3_c TIMESTAMP_LTZ(3),\n" + + " timestamp6_c TIMESTAMP_LTZ(6),\n" + " char_c CHAR(3),\n" + " varchar_c VARCHAR(255),\n" + + " file_uuid BINARY(16),\n" + " bit_c BINARY(8),\n" + " text_c STRING,\n" + " tiny_blob_c BYTES,\n" + " medium_blob_c BYTES,\n" - + " long_blob_c BYTES,\n" + " blob_c BYTES,\n" + + " long_blob_c BYTES,\n" + " year_c INT,\n" - + " set_c STRING,\n" + + " set_c ARRAY,\n" + " enum_c STRING,\n" + + " json_c STRING,\n" + " primary key (`id`) not enforced" + ") WITH (" + " 'connector' = 'oceanbase-cdc'," @@ -360,13 +367,16 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase { String sinkDDL = "CREATE TABLE sink (" + " `id` INT NOT NULL,\n" - + " bool_c TINYINT,\n" + + " bit1_c BOOLEAN,\n" + + " tiny1_c BOOLEAN,\n" + + " boolean_c BOOLEAN,\n" + " tiny_c TINYINT,\n" + " tiny_un_c SMALLINT,\n" + " small_c SMALLINT ,\n" + " small_un_c INT ,\n" + " medium_c INT,\n" + " medium_un_c INT,\n" + + " int11_c INT,\n" + " int_c INT,\n" + " int_un_c BIGINT,\n" + " big_c BIGINT,\n" @@ -382,8 +392,11 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase { + " datetime3_c TIMESTAMP(3),\n" + " datetime6_c TIMESTAMP(6),\n" + " timestamp_c TIMESTAMP,\n" + + " timestamp3_c TIMESTAMP(3),\n" + + " timestamp6_c TIMESTAMP(6),\n" + " char_c CHAR(3),\n" + " varchar_c VARCHAR(255),\n" + + " file_uuid BINARY(16),\n" + " bit_c BINARY(8),\n" + " text_c STRING,\n" + " tiny_blob_c BYTES,\n" @@ -391,8 +404,9 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase { + " blob_c BYTES,\n" + " long_blob_c BYTES,\n" + " year_c INT,\n" + + " set_c ARRAY,\n" + " enum_c STRING,\n" - + " set_c STRING,\n" + + " json_c STRING,\n" + " primary key (`id`) not enforced" + ") WITH (" + " 'connector' = 'values'," @@ -417,8 +431,107 @@ public class OceanBaseConnectorITCase extends OceanBaseTestBase { List expected = Arrays.asList( - "+I(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)", - "+U(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)"); + "+I(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})", + "+U(1,false,true,true,127,255,32767,65535,8388607,16777215,2147483647,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,abc,Hello World,[101, 26, -17, -65, -67, 8, 57, 15, 72, -17, -65, -67, -17, -65, -67, -17, -65, -67, 54, -17, -65, -67, 62, 123, 116, 0],[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,[a, b],red,{\"key1\": \"value1\"})"); + + List actual = TestValuesTableFactory.getRawResults("sink"); + assertContainsInAnyOrder(expected, actual); + result.getJobClient().get().cancel().get(); + } + + @Test + public void testTimezoneBerlin() throws Exception { + testTimeDataTypes("+02:00"); + } + + @Test + public void testTimezoneShanghai() throws Exception { + testTimeDataTypes("+08:00"); + } + + public void testTimeDataTypes(String serverTimeZone) throws Exception { + try (Connection connection = getJdbcConnection(""); + Statement statement = connection.createStatement()) { + statement.execute(String.format("SET GLOBAL time_zone = '%s';", serverTimeZone)); + } + tEnv.getConfig().setLocalTimeZone(ZoneId.of(serverTimeZone)); + initializeTable("column_type_test"); + String sourceDDL = + String.format( + "CREATE TABLE ob_source (\n" + + " `id` INT NOT NULL,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP_LTZ,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'oceanbase-cdc'," + + " 'scan.startup.mode' = 'initial'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'tenant-name' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'server-time-zone' = '%s'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'logproxy.host' = '%s'," + + " 'logproxy.port' = '%s'," + + " 'rootserver-list' = '%s'," + + " 'working-mode' = 'memory'" + + ")", + getUsername(), + getPassword(), + getTenant(), + "column_type_test", + "full_types", + serverTimeZone, + getObServerHost(), + getObServerSqlPort(), + getLogProxyHost(), + getLogProxyPort(), + getRsList()); + + String sinkDDL = + "CREATE TABLE sink (" + + " `id` INT NOT NULL,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT `id`, date_c, time_c, datetime3_c, datetime6_c, cast(timestamp_c as timestamp) FROM ob_source"); + + // wait for snapshot finished and begin binlog + waitForSinkSize("sink", 1); + int snapshotSize = sinkSize("sink"); + + try (Connection connection = getJdbcConnection("column_type_test"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); + } + + waitForSinkSize("sink", snapshotSize + 1); + + List expected = + Arrays.asList( + "+I(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22)", + "+U(1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22)"); List actual = TestValuesTableFactory.getRawResults("sink"); assertContainsInAnyOrder(expected, actual); diff --git a/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql b/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql index 1c48aafa3..1710dd250 100644 --- a/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql +++ b/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql @@ -21,13 +21,16 @@ USE column_type_test; CREATE TABLE full_types ( id INT AUTO_INCREMENT NOT NULL, - bool_c BOOLEAN, + bit1_c BIT, + tiny1_c TINYINT(1), + boolean_c BOOLEAN, tiny_c TINYINT, tiny_un_c TINYINT UNSIGNED, small_c SMALLINT, small_un_c SMALLINT UNSIGNED, medium_c MEDIUMINT, medium_un_c MEDIUMINT UNSIGNED, + int11_c INT(11), int_c INTEGER, int_un_c INTEGER UNSIGNED, big_c BIGINT, @@ -40,11 +43,14 @@ CREATE TABLE full_types big_decimal_c DECIMAL(65, 1), date_c DATE, time_c TIME(0), - datetime3_c TIMESTAMP(3), - datetime6_c TIMESTAMP(6), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), timestamp_c TIMESTAMP, + timestamp3_c TIMESTAMP(3), + timestamp6_c TIMESTAMP(6), char_c CHAR(3), varchar_c VARCHAR(255), + file_uuid BINARY(16), bit_c BIT(64), text_c TEXT, tiny_blob_c TINYBLOB, @@ -54,12 +60,15 @@ CREATE TABLE full_types year_c YEAR, set_c SET ('a', 'b'), enum_c ENUM ('red', 'green', 'blue'), + json_c JSON, PRIMARY KEY (id) ) DEFAULT CHARSET = utf8mb4; INSERT INTO full_types -VALUES (DEFAULT, true, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 9223372036854775807, - 18446744073709551615, 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, '2020-07-17', '18:00:22', - '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', 'abc', 'Hello World', +VALUES (DEFAULT, 0, 1, true, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 2147483647, 4294967295, + 9223372036854775807, 18446744073709551615, 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, + '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', + '2020-07-17 18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', + 'abc', 'Hello World', unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400', '-', '')), b'0000010000000100000001000000010000000100000001000000010000000100', 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), - UNHEX(HEX(16)), UNHEX(HEX(16)), 2022, 'a', 'red'); + UNHEX(HEX(16)), UNHEX(HEX(16)), 2022, 'a,b,a', 'red', '{"key1":"value1"}'); diff --git a/pom.xml b/pom.xml index b2af3a898..629b9c2f1 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ under the License. 1.7.15 2.17.1 2.4.2 - 1.0.5 + 1.0.6 1 true