diff --git a/flink-connector-oceanbase-cdc/pom.xml b/flink-connector-oceanbase-cdc/pom.xml new file mode 100644 index 000000000..3f170f210 --- /dev/null +++ b/flink-connector-oceanbase-cdc/pom.xml @@ -0,0 +1,149 @@ + + + + + flink-cdc-connectors + com.ververica + 2.2-SNAPSHOT + + 4.0.0 + + flink-connector-oceanbase-cdc + flink-connector-oceanbase-cdc + jar + + + + + com.ververica + flink-connector-debezium + ${project.version} + + + kafka-log4j-appender + org.apache.kafka + + + + + + + com.oceanbase.logclient + logproxy-client + ${oblogclient.version} + + + logback-classic + ch.qos.logback + + + + + + + mysql + mysql-connector-java + 5.1.47 + + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-table-runtime-blink_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-core + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-tests + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + test-jar + test + + + + + org.testcontainers + mssqlserver + ${testcontainers.version} + test + + + com.alibaba + dns-cache-manipulator + 1.7.1 + test + + + + diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java new file mode 100644 index 000000000..cd39380e6 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction; +import com.ververica.cdc.connectors.oceanbase.table.StartupMode; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; + +import java.time.Duration; +import java.time.ZoneId; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A builder to build a SourceFunction which can read snapshot and continue to consume commit log. + */ +@PublicEvolving +public class OceanBaseSource { + + public static Builder builder() { + return new Builder<>(); + } + + /** Builder class of {@link OceanBaseSource}. */ + public static class Builder { + + private StartupMode startupMode; + private Long startupTimestamp; + + private String username; + private String password; + private String tenantName; + private String databaseName; + private String tableName; + private String hostname; + private Integer port; + private Duration connectTimeout; + private String rsList; + private String logProxyHost; + private Integer logProxyPort; + private ZoneId serverTimeZone = ZoneId.of("UTC"); + + private DebeziumDeserializationSchema deserializer; + + public Builder startupMode(StartupMode startupMode) { + this.startupMode = startupMode; + return this; + } + + public Builder startupTimestamp(Long startupTimestamp) { + this.startupTimestamp = startupTimestamp; + return this; + } + + public Builder username(String username) { + this.username = username; + return this; + } + + public Builder password(String password) { + this.password = password; + return this; + } + + public Builder tenantName(String tenantName) { + this.tenantName = tenantName; + return this; + } + + public Builder databaseName(String databaseName) { + this.databaseName = databaseName; + return this; + } + + public Builder tableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Builder hostname(String hostname) { + this.hostname = hostname; + return this; + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder connectTimeout(Duration connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public Builder rsList(String rsList) { + this.rsList = rsList; + return this; + } + + public Builder logProxyHost(String logProxyHost) { + this.logProxyHost = logProxyHost; + return this; + } + + public Builder logProxyPort(int logProxyPort) { + this.logProxyPort = logProxyPort; + return this; + } + + public Builder serverTimeZone(ZoneId serverTimeZone) { + this.serverTimeZone = serverTimeZone; + return this; + } + + public Builder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + public SourceFunction build() { + switch (startupMode) { + case INITIAL: + case LATEST_OFFSET: + startupTimestamp = 0L; + break; + case TIMESTAMP: + checkNotNull(startupTimestamp, "startupTimestamp shouldn't be null"); + break; + default: + throw new UnsupportedOperationException( + startupMode + " mode is not supported."); + } + + return new OceanBaseRichSourceFunction( + startupMode.equals(StartupMode.INITIAL), + checkNotNull(startupTimestamp), + checkNotNull(username), + checkNotNull(password), + checkNotNull(tenantName), + checkNotNull(databaseName), + checkNotNull(tableName), + hostname, + port, + connectTimeout, + checkNotNull(rsList), + checkNotNull(logProxyHost), + checkNotNull(logProxyPort), + checkNotNull(serverTimeZone), + checkNotNull(deserializer)); + } + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java new file mode 100644 index 000000000..6b4cfe263 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import io.debezium.config.Configuration; +import io.debezium.jdbc.JdbcConnection; + +import java.time.Duration; + +/** {@link JdbcConnection} extension to be used with OceanBase server. */ +public class OceanBaseConnection extends JdbcConnection { + + protected static final String URL_PATTERN = + "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull&connectTimeout=${connectTimeout}"; + protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + + public OceanBaseConnection( + String hostname, + Integer port, + String user, + String password, + Duration timeout, + ClassLoader classLoader) { + super(config(hostname, port, user, password, timeout), factory(classLoader)); + } + + public static Configuration config( + String hostname, Integer port, String user, String password, Duration timeout) { + return Configuration.create() + .with("hostname", hostname) + .with("port", port) + .with("user", user) + .with("password", password) + .with("connectTimeout", timeout == null ? 30000 : timeout.toMillis()) + .build(); + } + + public static JdbcConnection.ConnectionFactory factory(ClassLoader classLoader) { + return JdbcConnection.patternBasedFactory(URL_PATTERN, DRIVER_CLASS_NAME, classLoader); + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java new file mode 100644 index 000000000..4df932efa --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseJdbcConverter.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import com.oceanbase.oms.logmessage.ByteString; +import com.oceanbase.oms.logmessage.DataMessage; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.ValueConverterProvider; +import io.debezium.util.NumberConversions; +import org.apache.kafka.connect.data.Schema; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.ZoneOffset; +import java.util.Arrays; + +/** Utils to convert jdbc type and value of a field. */ +public class OceanBaseJdbcConverter { + + public static ValueConverterProvider valueConverterProvider(ZoneOffset zoneOffset) { + return new JdbcValueConverters( + JdbcValueConverters.DecimalMode.STRING, + TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, + zoneOffset, + null, + JdbcValueConverters.BigIntUnsignedMode.PRECISE, + CommonConnectorConfig.BinaryHandlingMode.BYTES); + } + + public static Object getField(int jdbcType, Object value) { + if (value == null) { + return null; + } + jdbcType = getType(jdbcType, null); + switch (jdbcType) { + case Types.BIT: + if (value instanceof Boolean) { + return new byte[] {NumberConversions.getByte((Boolean) value)}; + } + return value; + case Types.INTEGER: + if (value instanceof Boolean) { + return NumberConversions.getInteger((Boolean) value); + } + if (value instanceof Date) { + return ((Date) value).getYear() + 1900; + } + return value; + case Types.FLOAT: + Float f = (Float) value; + return f.doubleValue(); + case Types.DECIMAL: + if (value instanceof BigInteger) { + return value.toString(); + } + BigDecimal decimal = (BigDecimal) value; + return decimal.toString(); + case Types.DATE: + Date date = (Date) value; + return io.debezium.time.Date.toEpochDay(date, null); + case Types.TIME: + Time time = (Time) value; + return io.debezium.time.MicroTime.toMicroOfDay(time, true); + case Types.TIMESTAMP: + Timestamp timestamp = (Timestamp) value; + return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null); + default: + return value; + } + } + + public static Object getField( + Schema.Type schemaType, DataMessage.Record.Field.Type fieldType, ByteString value) { + if (value == null) { + return null; + } + int jdbcType = getType(fieldType); + switch (jdbcType) { + case Types.NULL: + return null; + case Types.INTEGER: + if (schemaType.equals(Schema.Type.INT64)) { + return Long.parseLong(value.toString()); + } + return Integer.parseInt(value.toString()); + case Types.BIGINT: + if (schemaType.equals(Schema.Type.STRING)) { + return value.toString(); + } + return Long.parseLong(value.toString()); + case Types.DOUBLE: + return Double.parseDouble(value.toString()); + case Types.DATE: + Date date = Date.valueOf(value.toString()); + return io.debezium.time.Date.toEpochDay(date, null); + case Types.TIME: + Time time = Time.valueOf(value.toString()); + return io.debezium.time.MicroTime.toMicroOfDay(time, true); + case Types.TIMESTAMP: + Timestamp timestamp = Timestamp.valueOf(value.toString()); + return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null); + case Types.BIT: + long v = Long.parseLong(value.toString()); + byte[] bytes = ByteBuffer.allocate(8).putLong(v).array(); + int i = 0; + while (bytes[i] == 0 && i < Long.BYTES - 1) { + i++; + } + return Arrays.copyOfRange(bytes, i, Long.BYTES); + case Types.BINARY: + return ByteBuffer.wrap(value.toString().getBytes(StandardCharsets.UTF_8)); + default: + return value.toString(); + } + } + + private static boolean isBoolean(int jdbcType, String typeName) { + return jdbcType == Types.BOOLEAN || (jdbcType == Types.BIT && "TINYINT".equals(typeName)); + } + + public static int getType(int jdbcType, String typeName) { + // treat boolean as tinyint type + if (isBoolean(jdbcType, typeName)) { + jdbcType = Types.TINYINT; + } + // treat year as int type + if ("YEAR".equals(typeName)) { + jdbcType = Types.INTEGER; + } + + // upcasting + if ("INT UNSIGNED".equals(typeName)) { + jdbcType = Types.BIGINT; + } + if ("BIGINT UNSIGNED".equals(typeName)) { + jdbcType = Types.DECIMAL; + } + + // widening conversion according to com.mysql.jdbc.ResultSetImpl#getObject + switch (jdbcType) { + case Types.TINYINT: + case Types.SMALLINT: + return Types.INTEGER; + case Types.REAL: + return Types.FLOAT; + default: + return jdbcType; + } + } + + public static int getType(DataMessage.Record.Field.Type fieldType) { + switch (fieldType) { + case NULL: + return Types.NULL; + case INT8: + case INT16: + case INT24: + case INT32: + case YEAR: + return Types.INTEGER; + case INT64: + return Types.BIGINT; + case FLOAT: + case DOUBLE: + return Types.DOUBLE; + case DECIMAL: + return Types.DECIMAL; + case ENUM: + case SET: + case STRING: + case JSON: + return Types.CHAR; + case TIMESTAMP: + case DATETIME: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_NANO: + return Types.TIMESTAMP; + case DATE: + return Types.DATE; + case TIME: + return Types.TIME; + case BIT: + return Types.BIT; + case BLOB: + case BINARY: + return Types.BINARY; + case INTERVAL_YEAR_TO_MONTH: + case INTERVAL_DAY_TO_SECOND: + case GEOMETRY: + case RAW: + // it's weird to get wrong type from TEXT column, temporarily treat it as a string + case UNKOWN: + default: + return Types.VARCHAR; + } + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java new file mode 100644 index 000000000..538e1ed1a --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; + +import com.mysql.jdbc.ResultSetMetaData; +import com.oceanbase.clogproxy.client.LogProxyClient; +import com.oceanbase.clogproxy.client.config.ClientConf; +import com.oceanbase.clogproxy.client.config.ObReaderConfig; +import com.oceanbase.clogproxy.client.exception.LogProxyClientException; +import com.oceanbase.clogproxy.client.listener.RecordListener; +import com.oceanbase.clogproxy.client.util.ClientIdGenerator; +import com.oceanbase.oms.logmessage.DataMessage; +import com.oceanbase.oms.logmessage.LogMessage; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.relational.TableSchema; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * The source implementation for OceanBase that read snapshot events first and then read the change + * event. + * + * @param The type created by the deserializer. + */ +public class OceanBaseRichSourceFunction extends RichSourceFunction + implements CheckpointListener, CheckpointedFunction, ResultTypeQueryable { + + private static final long serialVersionUID = 2844054619864617340L; + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseRichSourceFunction.class); + + private final boolean snapshot; + private final String username; + private final String password; + private final String tenantName; + private final String databaseName; + private final String tableName; + private final String hostname; + private final Integer port; + private final Duration connectTimeout; + private final String rsList; + private final String logProxyHost; + private final int logProxyPort; + private final long startTimestamp; + private final ZoneOffset zoneOffset; + private final DebeziumDeserializationSchema deserializer; + + private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false); + private final List logMessageBuffer = new LinkedList<>(); + + private transient Map tableSchemaMap; + private transient volatile long resolvedTimestamp; + private transient volatile OceanBaseConnection snapshotConnection; + private transient LogProxyClient logProxyClient; + private transient ListState offsetState; + private transient OutputCollector outputCollector; + + public OceanBaseRichSourceFunction( + boolean snapshot, + long startTimestamp, + String username, + String password, + String tenantName, + String databaseName, + String tableName, + String hostname, + Integer port, + Duration connectTimeout, + String rsList, + String logProxyHost, + int logProxyPort, + ZoneId serverTimeZone, + DebeziumDeserializationSchema deserializer) { + this.snapshot = snapshot; + this.username = username; + this.password = password; + this.tenantName = tenantName; + this.databaseName = databaseName; + this.tableName = tableName; + this.hostname = hostname; + this.port = port; + this.connectTimeout = connectTimeout; + this.rsList = rsList; + this.logProxyHost = logProxyHost; + this.logProxyPort = logProxyPort; + this.startTimestamp = startTimestamp; + this.zoneOffset = serverTimeZone.getRules().getOffset(Instant.now()); + this.deserializer = deserializer; + } + + @Override + public void open(final Configuration config) throws Exception { + super.open(config); + this.outputCollector = new OutputCollector<>(); + this.tableSchemaMap = new ConcurrentHashMap<>(); + this.resolvedTimestamp = -1; + } + + @Override + public void run(SourceContext ctx) throws Exception { + outputCollector.context = ctx; + + LOG.info("Start readChangeEvents process"); + readChangeEvents(); + + if (shouldReadSnapshot()) { + synchronized (ctx.getCheckpointLock()) { + try { + if (snapshotConnection == null) { + snapshotConnection = + new OceanBaseConnection( + hostname, + port, + username, + password, + connectTimeout, + getClass().getClassLoader()); + } + readSnapshot(); + } finally { + if (snapshotConnection != null) { + snapshotConnection.close(); + } + } + LOG.info("Snapshot reading finished"); + } + } else { + LOG.info("Skip snapshot read"); + } + + logProxyClient.join(); + } + + protected void readSnapshot() { + final Map tableMap = new HashMap<>(); + try { + String sql = + String.format( + "SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " + + "WHERE TABLE_TYPE='BASE TABLE' and TABLE_SCHEMA REGEXP '%s' and TABLE_NAME REGEXP '%s'", + databaseName, tableName); + snapshotConnection.query( + sql, + rs -> { + while (rs.next()) { + tableMap.put(rs.getString(1), rs.getString(2)); + } + }); + } catch (SQLException e) { + LOG.error("Query database and table name failed", e); + throw new FlinkRuntimeException(e); + } + tableMap.forEach(this::readSnapshotFromTable); + snapshotCompleted.set(true); + } + + private void readSnapshotFromTable(String databaseName, String tableName) { + // TODO make topic name configurable + String topicName = getDefaultTopicName(tenantName, databaseName, tableName); + Map partition = getSourcePartition(tenantName, databaseName, tableName); + // the offset here is useless + Map offset = getSourceOffset(resolvedTimestamp); + + String fullName = String.format("`%s`.`%s`", databaseName, tableName); + String selectSql = "SELECT * FROM " + fullName; + try { + snapshotConnection.query( + selectSql, + rs -> { + ResultSetMetaData metaData = (ResultSetMetaData) rs.getMetaData(); + String[] columnNames = new String[metaData.getColumnCount()]; + int[] jdbcTypes = new int[metaData.getColumnCount()]; + for (int i = 0; i < metaData.getColumnCount(); i++) { + columnNames[i] = metaData.getColumnName(i + 1); + jdbcTypes[i] = + OceanBaseJdbcConverter.getType( + metaData.getColumnType(i + 1), + metaData.getColumnTypeName(i + 1)); + } + + TableSchema tableSchema = tableSchemaMap.get(topicName); + if (tableSchema == null) { + tableSchema = + OceanBaseTableSchema.getTableSchema( + topicName, + databaseName, + tableName, + columnNames, + jdbcTypes, + zoneOffset); + tableSchemaMap.put(topicName, tableSchema); + } + + Struct source = + OceanBaseSchemaUtils.sourceStruct( + tenantName, databaseName, tableName, null, null); + + while (rs.next()) { + Struct value = new Struct(tableSchema.valueSchema()); + for (int i = 0; i < metaData.getColumnCount(); i++) { + value.put( + columnNames[i], + OceanBaseJdbcConverter.getField( + jdbcTypes[i], rs.getObject(i + 1))); + } + Struct struct = + tableSchema.getEnvelopeSchema().create(value, source, null); + try { + deserializer.deserialize( + new SourceRecord( + partition, + offset, + topicName, + null, + null, + null, + struct.schema(), + struct), + outputCollector); + } catch (Exception e) { + LOG.error("Deserialize snapshot record failed ", e); + throw new FlinkRuntimeException(e); + } + } + }); + } catch (SQLException e) { + LOG.error("Read snapshot from table " + fullName + " failed", e); + throw new FlinkRuntimeException(e); + } + } + + protected void readChangeEvents() throws InterruptedException { + String tableWhiteList = String.format("%s.%s.%s", tenantName, databaseName, tableName); + ObReaderConfig obReaderConfig = new ObReaderConfig(); + obReaderConfig.setRsList(rsList); + obReaderConfig.setUsername(username); + obReaderConfig.setPassword(password); + obReaderConfig.setTableWhiteList(tableWhiteList); + + if (resolvedTimestamp > 0) { + obReaderConfig.setStartTimestamp(resolvedTimestamp); + LOG.info("Read change events from resolvedTimestamp: {}", resolvedTimestamp); + } else { + obReaderConfig.setStartTimestamp(startTimestamp); + LOG.info("Read change events from startTimestamp: {}", startTimestamp); + } + + final CountDownLatch latch = new CountDownLatch(1); + + // avoid client id duplication when starting multiple connectors in one etl + ClientConf.USER_DEFINED_CLIENTID = ClientIdGenerator.generate() + tableWhiteList; + logProxyClient = new LogProxyClient(logProxyHost, logProxyPort, obReaderConfig); + + logProxyClient.addListener( + new RecordListener() { + + boolean started = false; + + @Override + public void notify(LogMessage message) { + switch (message.getOpt()) { + case HEARTBEAT: + case BEGIN: + if (!started) { + started = true; + latch.countDown(); + } + break; + case INSERT: + case UPDATE: + case DELETE: + if (!started) { + break; + } + logMessageBuffer.add(message); + break; + case COMMIT: + // flush buffer after snapshot completed + if (!shouldReadSnapshot() || snapshotCompleted.get()) { + logMessageBuffer.forEach( + msg -> { + try { + deserializer.deserialize( + getRecordFromLogMessage(msg), + outputCollector); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + logMessageBuffer.clear(); + resolvedTimestamp = Long.parseLong(message.getTimestamp()); + } + break; + case DDL: + // TODO record ddl and remove expired table schema + LOG.trace( + "Ddl: {}", + message.getFieldList().get(0).getValue().toString()); + break; + default: + throw new UnsupportedOperationException( + "Unsupported type: " + message.getOpt()); + } + } + + @Override + public void onException(LogProxyClientException e) { + LOG.error("LogProxyClient exception", e); + logProxyClient.stop(); + } + }); + + logProxyClient.start(); + LOG.info("LogProxyClient started"); + latch.await(); + LOG.info("LogProxyClient packet processing started"); + } + + private SourceRecord getRecordFromLogMessage(LogMessage message) throws Exception { + String databaseName = message.getDbName().replace(tenantName + ".", ""); + // TODO make topic name configurable + String topicName = getDefaultTopicName(tenantName, databaseName, message.getTableName()); + + if (tableSchemaMap.get(topicName) == null) { + String[] columnNames = new String[message.getFieldCount()]; + int[] jdbcTypes = new int[message.getFieldCount()]; + int i = 0; + for (DataMessage.Record.Field field : message.getFieldList()) { + if (message.getOpt() == DataMessage.Record.Type.UPDATE && field.isPrev()) { + continue; + } + columnNames[i] = field.getFieldname(); + jdbcTypes[i] = OceanBaseJdbcConverter.getType(field.getType()); + i++; + } + TableSchema tableSchema = + OceanBaseTableSchema.getTableSchema( + topicName, databaseName, tableName, columnNames, jdbcTypes, zoneOffset); + tableSchemaMap.put(topicName, tableSchema); + } + + Struct source = + OceanBaseSchemaUtils.sourceStruct( + tenantName, + databaseName, + message.getTableName(), + message.getTimestamp(), + message.getOB10UniqueId()); + Struct struct; + switch (message.getOpt()) { + case INSERT: + Struct after = getLogValueStruct(topicName, message.getFieldList()); + struct = + tableSchemaMap + .get(topicName) + .getEnvelopeSchema() + .create(after, source, null); + break; + case UPDATE: + List beforeFields = new ArrayList<>(); + List afterFields = new ArrayList<>(); + for (DataMessage.Record.Field field : message.getFieldList()) { + if (field.isPrev()) { + beforeFields.add(field); + } else { + afterFields.add(field); + } + } + after = getLogValueStruct(topicName, afterFields); + Struct before = getLogValueStruct(topicName, beforeFields); + struct = + tableSchemaMap + .get(topicName) + .getEnvelopeSchema() + .update(before, after, source, null); + break; + case DELETE: + before = getLogValueStruct(topicName, message.getFieldList()); + struct = + tableSchemaMap + .get(topicName) + .getEnvelopeSchema() + .delete(before, source, null); + break; + default: + throw new UnsupportedOperationException( + "Unsupported dml type: " + message.getOpt()); + } + return new SourceRecord( + getSourcePartition(tenantName, databaseName, message.getTableName()), + getSourceOffset(Long.parseLong(message.getTimestamp())), + topicName, + null, + null, + null, + struct.schema(), + struct); + } + + private boolean shouldReadSnapshot() { + return resolvedTimestamp == -1 && snapshot; + } + + private String getDefaultTopicName(String tenantName, String databaseName, String tableName) { + return String.format("%s.%s.%s", tenantName, databaseName, tableName); + } + + private Map getSourcePartition( + String tenantName, String databaseName, String tableName) { + Map sourcePartition = new HashMap<>(); + sourcePartition.put("tenant", tenantName); + sourcePartition.put("database", databaseName); + sourcePartition.put("table", tableName); + return sourcePartition; + } + + private Map getSourceOffset(long timestamp) { + Map sourceOffset = new HashMap<>(); + sourceOffset.put("timestamp", timestamp); + return sourceOffset; + } + + private Struct getLogValueStruct(String topicName, List fieldList) { + TableSchema tableSchema = tableSchemaMap.get(topicName); + Struct value = new Struct(tableSchema.valueSchema()); + Object fieldValue; + for (DataMessage.Record.Field field : fieldList) { + try { + Schema fieldSchema = tableSchema.valueSchema().field(field.getFieldname()).schema(); + fieldValue = + OceanBaseJdbcConverter.getField( + fieldSchema.type(), field.getType(), field.getValue()); + value.put(field.getFieldname(), fieldValue); + } catch (NumberFormatException e) { + tableSchema = + OceanBaseTableSchema.upcastingTableSchema( + topicName, + tableSchema, + fieldList.stream() + .collect( + Collectors.toMap( + DataMessage.Record.Field::getFieldname, + f -> f.getValue().toString()))); + tableSchemaMap.put(topicName, tableSchema); + return getLogValueStruct(topicName, fieldList); + } + } + return value; + } + + @Override + public void notifyCheckpointComplete(long l) { + // do nothing + } + + @Override + public TypeInformation getProducedType() { + return this.deserializer.getProducedType(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + LOG.info( + "snapshotState checkpoint: {} at resolvedTimestamp: {}", + context.getCheckpointId(), + resolvedTimestamp); + offsetState.clear(); + offsetState.add(resolvedTimestamp); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("initialize checkpoint"); + offsetState = + context.getOperatorStateStore() + .getListState( + new ListStateDescriptor<>( + "resolvedTimestampState", LongSerializer.INSTANCE)); + if (context.isRestored()) { + for (final Long offset : offsetState.get()) { + resolvedTimestamp = offset; + LOG.info("Restore State from resolvedTimestamp: {}", resolvedTimestamp); + return; + } + } + } + + @Override + public void cancel() { + try { + if (snapshotConnection != null) { + snapshotConnection.close(); + } + } catch (SQLException e) { + LOG.error("Failed to close snapshotConnection", e); + } + if (logProxyClient != null) { + logProxyClient.stop(); + } + } + + private static class OutputCollector implements Collector { + + private SourceContext context; + + @Override + public void collect(T record) { + context.collect(record); + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java new file mode 100644 index 000000000..50e6e4227 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseSchemaUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +/** Utils to deal with OceanBase SourceRecord schema. */ +public class OceanBaseSchemaUtils { + + public static Schema sourceSchema() { + return SchemaBuilder.struct() + .field("tenant", Schema.STRING_SCHEMA) + .field("database", Schema.STRING_SCHEMA) + .field("table", Schema.STRING_SCHEMA) + .field("timestamp", Schema.OPTIONAL_STRING_SCHEMA) + .field("unique_id", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + public static Struct sourceStruct( + String tenant, String database, String table, String timestamp, String uniqueId) { + Struct struct = + new Struct(sourceSchema()) + .put("tenant", tenant) + .put("database", database) + .put("table", table); + if (timestamp != null) { + struct.put("timestamp", timestamp); + } + if (uniqueId != null) { + struct.put("unique_id", uniqueId); + } + return struct; + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java new file mode 100644 index 000000000..2eba11995 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseTableSchema.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.source; + +import io.debezium.data.Envelope; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.CustomConverterRegistry; +import io.debezium.relational.Table; +import io.debezium.relational.TableEditor; +import io.debezium.relational.TableId; +import io.debezium.relational.TableSchema; +import io.debezium.relational.TableSchemaBuilder; +import io.debezium.util.SchemaNameAdjuster; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import java.math.BigInteger; +import java.sql.Types; +import java.time.ZoneOffset; +import java.util.Map; + +/** Utils to deal with table schema of OceanBase. */ +public class OceanBaseTableSchema { + + public static TableSchemaBuilder tableSchemaBuilder(ZoneOffset zoneOffset) { + return new TableSchemaBuilder( + OceanBaseJdbcConverter.valueConverterProvider(zoneOffset), + SchemaNameAdjuster.create(), + new CustomConverterRegistry(null), + OceanBaseSchemaUtils.sourceSchema(), + false); + } + + public static TableId tableId(String databaseName, String tableName) { + return new TableId(databaseName, null, tableName); + } + + public static Column getColumn(String name, int jdbcType) { + // we can't get the scale and length of decimal, timestamp and bit columns from log, + // so here we set a constant value to these fields to be compatible with the logic of + // JdbcValueConverters#schemaBuilder + ColumnEditor columnEditor = + Column.editor().name(name).jdbcType(jdbcType).optional(true).scale(0); + if (columnEditor.jdbcType() == Types.TIMESTAMP || columnEditor.jdbcType() == Types.BIT) { + columnEditor.length(6); + } + return columnEditor.create(); + } + + public static TableSchema getTableSchema( + String topicName, + String databaseName, + String tableName, + String[] columnNames, + int[] jdbcTypes, + ZoneOffset zoneOffset) { + TableEditor tableEditor = Table.editor().tableId(tableId(databaseName, tableName)); + for (int i = 0; i < columnNames.length; i++) { + tableEditor.addColumn(getColumn(columnNames[i], jdbcTypes[i])); + } + // TODO add column filter and mapper + return tableSchemaBuilder(zoneOffset) + .create( + null, + Envelope.schemaName(topicName), + tableEditor.create(), + null, + null, + null); + } + + public static Schema upcastingSchemaType(Schema schema, String value) { + if (schema.type().equals(Schema.Type.INT32) && Long.parseLong(value) > Integer.MAX_VALUE) { + return Schema.INT64_SCHEMA; + } + if (schema.type().equals(Schema.Type.INT64)) { + BigInteger bigInt = new BigInteger(value); + if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { + return Schema.STRING_SCHEMA; + } + } + return schema; + } + + public static Schema upcastingValueSchema(Schema valueSchema, Map fields) { + SchemaBuilder schemaBuilder = SchemaBuilder.struct().optional(); + for (Map.Entry entry : fields.entrySet()) { + Schema fieldSchema = valueSchema.field(entry.getKey()).schema(); + fieldSchema = upcastingSchemaType(fieldSchema, entry.getValue()); + schemaBuilder.field(entry.getKey(), fieldSchema); + } + return schemaBuilder.build(); + } + + public static Envelope getEnvelope(String name, Schema valueSchema) { + return Envelope.defineSchema() + .withName(name) + .withRecord(valueSchema) + .withSource(OceanBaseSchemaUtils.sourceSchema()) + .build(); + } + + public static TableSchema upcastingTableSchema( + String topicName, TableSchema tableSchema, Map fields) { + Schema valueSchema = upcastingValueSchema(tableSchema.valueSchema(), fields); + return new TableSchema( + tableSchema.id(), + null, + null, + getEnvelope(Envelope.schemaName(topicName), valueSchema), + valueSchema, + null); + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java new file mode 100644 index 000000000..b1216ea71 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseReadableMetadata.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import com.ververica.cdc.debezium.table.MetadataConverter; +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** Defines the supported metadata columns for {@link OceanBaseTableSource}. */ +public enum OceanBaseReadableMetadata { + + /** Name of the tenant that contains the row. */ + TENANT( + "tenant_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString(source.getString("tenant")); + } + }), + + /** Name of the database that contains the row. */ + DATABASE( + "database_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString(source.getString("database")); + } + }), + + /** Name of the table that contains the row. */ + TABLE( + "table_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString(source.getString("table")); + } + }), + + /** + * It indicates the time that the change was made in the database. If the record is read from + * snapshot of the table instead of the change stream, the value is always 0. + */ + OP_TS( + "op_ts", + DataTypes.TIMESTAMP_LTZ(3).notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + String timestamp = source.getString("timestamp"); + if (timestamp == null) { + timestamp = "0"; + } + return TimestampData.fromEpochMillis(Long.parseLong(timestamp) * 1000); + } + }); + + private final String key; + + private final DataType dataType; + + private final MetadataConverter converter; + + OceanBaseReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public MetadataConverter getConverter() { + return converter; + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java new file mode 100644 index 000000000..259062078 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A {@link DynamicTableSource} implementation for OceanBase. */ +public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMetadata { + + private final ResolvedSchema physicalSchema; + + private final StartupMode startupMode; + private final Long startupTimestamp; + + private final String username; + private final String password; + private final String tenantName; + private final String databaseName; + private final String tableName; + private final String hostname; + private final Integer port; + private final Duration connectTimeout; + private final ZoneId serverTimeZone; + + private final String rsList; + private final String logProxyHost; + private final Integer logProxyPort; + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + + public OceanBaseTableSource( + ResolvedSchema physicalSchema, + StartupMode startupMode, + Long startupTimestamp, + String username, + String password, + String tenantName, + String databaseName, + String tableName, + String hostname, + Integer port, + Duration connectTimeout, + ZoneId serverTimeZone, + String rsList, + String logProxyHost, + int logProxyPort) { + this.physicalSchema = physicalSchema; + this.startupMode = startupMode; + this.startupTimestamp = startupTimestamp; + this.username = checkNotNull(username); + this.password = checkNotNull(password); + this.tenantName = checkNotNull(tenantName); + this.databaseName = checkNotNull(databaseName); + this.tableName = checkNotNull(tableName); + this.hostname = hostname; + this.port = port; + this.serverTimeZone = serverTimeZone; + this.connectTimeout = connectTimeout; + this.rsList = checkNotNull(rsList); + this.logProxyHost = checkNotNull(logProxyHost); + this.logProxyPort = logProxyPort; + + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); + TypeInformation resultTypeInfo = context.createTypeInformation(producedDataType); + + DebeziumDeserializationSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(resultTypeInfo) + .setServerTimeZone(serverTimeZone) + .build(); + + OceanBaseSource.Builder builder = + OceanBaseSource.builder() + .startupMode(startupMode) + .startupTimestamp(startupTimestamp) + .username(username) + .password(password) + .tenantName(tenantName) + .databaseName(databaseName) + .tableName(tableName) + .hostname(hostname) + .port(port) + .connectTimeout(connectTimeout) + .rsList(rsList) + .logProxyHost(logProxyHost) + .logProxyPort(logProxyPort) + .serverTimeZone(serverTimeZone) + .deserializer(deserializer); + return SourceFunctionProvider.of(builder.build(), false); + } + + protected MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + return metadataKeys.stream() + .map( + key -> + Stream.of(OceanBaseReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(OceanBaseReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + + @Override + public Map listReadableMetadata() { + return Stream.of(OceanBaseReadableMetadata.values()) + .collect( + Collectors.toMap( + OceanBaseReadableMetadata::getKey, + OceanBaseReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } + + @Override + public DynamicTableSource copy() { + OceanBaseTableSource source = + new OceanBaseTableSource( + physicalSchema, + startupMode, + startupTimestamp, + username, + password, + tenantName, + databaseName, + tableName, + hostname, + port, + connectTimeout, + serverTimeZone, + rsList, + logProxyHost, + logProxyPort); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OceanBaseTableSource that = (OceanBaseTableSource) o; + return Objects.equals(this.physicalSchema, that.physicalSchema) + && Objects.equals(this.startupMode, that.startupMode) + && Objects.equals(this.startupTimestamp, that.startupTimestamp) + && Objects.equals(this.username, that.username) + && Objects.equals(this.password, that.password) + && Objects.equals(this.tenantName, that.tenantName) + && Objects.equals(this.databaseName, that.databaseName) + && Objects.equals(this.tableName, that.tableName) + && Objects.equals(this.hostname, that.hostname) + && Objects.equals(this.port, that.port) + && Objects.equals(this.connectTimeout, that.connectTimeout) + && Objects.equals(this.serverTimeZone, that.serverTimeZone) + && Objects.equals(this.rsList, that.rsList) + && Objects.equals(this.logProxyHost, that.logProxyHost) + && Objects.equals(this.logProxyPort, that.logProxyPort) + && Objects.equals(this.producedDataType, that.producedDataType) + && Objects.equals(this.metadataKeys, that.metadataKeys); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + startupMode, + startupTimestamp, + username, + password, + tenantName, + databaseName, + tableName, + hostname, + port, + connectTimeout, + serverTimeZone, + rsList, + logProxyHost, + logProxyPort, + producedDataType, + metadataKeys); + } + + @Override + public String asSummaryString() { + return "OceanBase-CDC"; + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java new file mode 100644 index 000000000..4f8a1f380 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.HashSet; +import java.util.Set; + +/** Factory for creating configured instance of {@link OceanBaseTableSource}. */ +public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory { + + private static final String IDENTIFIER = "oceanbase-cdc"; + + public static final ConfigOption SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for OceanBase CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\" or \"timestamp\""); + + public static final ConfigOption SCAN_STARTUP_TIMESTAMP = + ConfigOptions.key("scan.startup.timestamp") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp in seconds used in case of \"timestamp\" startup mode."); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("Username to be used when connecting to OceanBase."); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("Password to be used when connecting to OceanBase."); + + public static final ConfigOption TENANT_NAME = + ConfigOptions.key("tenant-name") + .stringType() + .noDefaultValue() + .withDescription("Tenant name of OceanBase to monitor."); + + public static final ConfigOption DATABASE_NAME = + ConfigOptions.key("database-name") + .stringType() + .noDefaultValue() + .withDescription("Database name of OceanBase to monitor."); + + public static final ConfigOption TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("Table name of OceanBase to monitor."); + + public static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription( + "IP address or hostname of the OceanBase database server or OceanBase proxy server."); + + public static final ConfigOption PORT = + ConfigOptions.key("port") + .intType() + .noDefaultValue() + .withDescription( + "Integer port number of OceanBase database server or OceanBase proxy server."); + + public static final ConfigOption CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out."); + + public static final ConfigOption SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .defaultValue("UTC") + .withDescription("The session time zone in database server."); + + public static final ConfigOption RS_LIST = + ConfigOptions.key("rootserver-list") + .stringType() + .noDefaultValue() + .withDescription( + "The semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`."); + + public static final ConfigOption LOG_PROXY_HOST = + ConfigOptions.key("logproxy.host") + .stringType() + .noDefaultValue() + .withDescription("Hostname or IP address of OceanBase log proxy service."); + + public static final ConfigOption LOG_PROXY_PORT = + ConfigOptions.key("logproxy.port") + .intType() + .noDefaultValue() + .withDescription("Port number of OceanBase log proxy service."); + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + + ReadableConfig config = helper.getOptions(); + + StartupMode startupMode = StartupMode.getStartupMode(config.get(SCAN_STARTUP_MODE)); + Long startupTimestamp = config.get(SCAN_STARTUP_TIMESTAMP); + + String username = config.get(USERNAME); + String password = config.get(PASSWORD); + String tenantName = config.get(TENANT_NAME); + String databaseName = config.get(DATABASE_NAME); + String tableName = config.get(TABLE_NAME); + String rsList = config.get(RS_LIST); + String logProxyHost = config.get(LOG_PROXY_HOST); + int logProxyPort = config.get(LOG_PROXY_PORT); + String hostname = config.get(HOSTNAME); + Integer port = config.get(PORT); + Duration connectTimeout = config.get(CONNECT_TIMEOUT); + ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); + + return new OceanBaseTableSource( + physicalSchema, + startupMode, + startupTimestamp, + username, + password, + tenantName, + databaseName, + tableName, + hostname, + port, + connectTimeout, + serverTimeZone, + rsList, + logProxyHost, + logProxyPort); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(SCAN_STARTUP_MODE); + options.add(USERNAME); + options.add(PASSWORD); + options.add(TENANT_NAME); + options.add(DATABASE_NAME); + options.add(TABLE_NAME); + options.add(RS_LIST); + options.add(LOG_PROXY_HOST); + options.add(LOG_PROXY_PORT); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(SCAN_STARTUP_TIMESTAMP); + options.add(HOSTNAME); + options.add(PORT); + options.add(CONNECT_TIMEOUT); + options.add(SERVER_TIME_ZONE); + return options; + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/StartupMode.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/StartupMode.java new file mode 100644 index 000000000..0e440d186 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/StartupMode.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.table.api.ValidationException; + +/** Startup modes for the OceanBase CDC Consumer. */ +public enum StartupMode { + /** + * Performs an initial snapshot on the monitored database tables upon first startup, and + * continue to read the commit log. + */ + INITIAL, + + /** + * Never to perform snapshot on the monitored database tables upon first startup, just read from + * the end of the commit log which means only have the changes since the connector was started. + */ + LATEST_OFFSET, + + /** + * Never to perform snapshot on the monitored database tables upon first startup, and directly + * read commit log from the specified timestamp. + */ + TIMESTAMP; + + public static StartupMode getStartupMode(String modeString) { + switch (modeString.toLowerCase()) { + case "initial": + return INITIAL; + case "latest-offset": + return LATEST_OFFSET; + case "timestamp": + return TIMESTAMP; + default: + throw new ValidationException( + String.format("Invalid startup mode '%s'.", modeString)); + } + } +} diff --git a/flink-connector-oceanbase-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-oceanbase-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..3d02010b5 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java new file mode 100644 index 000000000..ef50aa686 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/OceanBaseTestBase.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase; + +import org.apache.flink.util.TestLogger; + +import com.alibaba.dcm.DnsCacheManipulator; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitAllStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertNotNull; + +/** Basic class for testing OceanBase source. */ +public class OceanBaseTestBase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestBase.class); + + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + public static final String OB_LOG_PROXY_SERVICE_NAME = "oblogproxy"; + + // Should be deprecated after official images are released. + public static final String DOCKER_IMAGE_NAME = "whhe/oblogproxy:obce_3.1.1"; + + // For details about config, see https://github.com/whhe/dockerfiles/tree/master/oblogproxy + public static final int OB_LOG_PROXY_PORT = 2983; + public static final int OB_SERVER_SQL_PORT = 2881; + public static final int OB_SERVER_RPC_PORT = 2882; + + // Here we use root user of system tenant for testing, as the log proxy service needs + // a user of system tenant for authentication. It is not recommended for production. + public static final String OB_SYS_USERNAME = "root"; + public static final String OB_SYS_PASSWORD = "pswd"; + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + @ClassRule + public static final GenericContainer OB_WITH_LOG_PROXY = + new GenericContainer<>(DOCKER_IMAGE_NAME) + .withNetwork(NETWORK) + .withNetworkAliases(OB_LOG_PROXY_SERVICE_NAME) + .withExposedPorts(OB_SERVER_SQL_PORT, OB_SERVER_RPC_PORT, OB_LOG_PROXY_PORT) + .withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD) + .waitingFor( + new WaitAllStrategy() + .withStrategy(Wait.forListeningPort()) + .withStrategy(Wait.forLogMessage(".*boot success!.*", 1))) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeClass + public static void startContainers() { + // Add jvm dns cache for flink to invoke ob interface. + DnsCacheManipulator.setDnsCache(OB_LOG_PROXY_SERVICE_NAME, "127.0.0.1"); + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(OB_WITH_LOG_PROXY)).join(); + LOG.info("Containers are started."); + } + + @AfterClass + public static void stopContainers() { + DnsCacheManipulator.removeDnsCache(OB_LOG_PROXY_SERVICE_NAME); + LOG.info("Stopping containers..."); + Stream.of(OB_WITH_LOG_PROXY).forEach(GenericContainer::stop); + LOG.info("Containers are stopped."); + } + + public static String getJdbcUrl(String databaseName) { + return "jdbc:mysql://" + + OB_WITH_LOG_PROXY.getContainerIpAddress() + + ":" + + OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT) + + "/" + + databaseName + + "?useSSL=false"; + } + + protected static Connection getJdbcConnection(String databaseName) throws SQLException { + return DriverManager.getConnection( + getJdbcUrl(databaseName), OB_SYS_USERNAME, OB_SYS_PASSWORD); + } + + private static void dropTestDatabase(Connection connection, String databaseName) { + try { + Awaitility.await(String.format("Dropping database %s", databaseName)) + .atMost(120, TimeUnit.SECONDS) + .until( + () -> { + try { + String sql = + String.format( + "DROP DATABASE IF EXISTS %s", databaseName); + connection.createStatement().execute(sql); + return true; + } catch (SQLException e) { + LOG.warn( + String.format( + "DROP DATABASE %s failed: {}", databaseName), + e.getMessage()); + return false; + } + }); + } catch (ConditionTimeoutException e) { + throw new IllegalStateException("Failed to drop test database", e); + } + } + + protected void initializeTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getJdbcConnection(""); + Statement statement = connection.createStatement()) { + dropTestDatabase(connection, sqlFile); + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java new file mode 100644 index 000000000..20a34c3d3 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.utils.LegacyRowResource; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import com.ververica.cdc.connectors.oceanbase.OceanBaseTestBase; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.Statement; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** Integration tests for OceanBase change stream event SQL source. */ +public class OceanBaseConnectorITCase extends OceanBaseTestBase { + + private static final int DEFAULT_PARALLELISM = 2; + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment() + .setParallelism(DEFAULT_PARALLELISM); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, + EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + + @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.enableCheckpointing(1000); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + } + + @Test + public void testConsumingAllEvents() throws Exception { + initializeTable("inventory"); + + String sourceDDL = + String.format( + "CREATE TABLE ob_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'oceanbase-cdc'," + + " 'scan.startup.mode' = 'initial'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'tenant-name' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'rootserver-list' = '%s'," + + " 'logproxy.host' = '%s'," + + " 'logproxy.port' = '%s'" + + ")", + OB_SYS_USERNAME, + OB_SYS_PASSWORD, + "sys", + "inventory", + "products", + OB_WITH_LOG_PROXY.getContainerIpAddress(), + OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT), + "127.0.0.1:2882:2881", + OB_WITH_LOG_PROXY.getContainerIpAddress(), + OB_WITH_LOG_PROXY.getMappedPort(OB_LOG_PROXY_PORT)); + + String sinkDDL = + "CREATE TABLE sink (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '30'" + + ")"; + + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source"); + + waitForSinkSize("sink", 9); + int snapshotSize = sinkSize("sink"); + + try (Connection connection = getJdbcConnection("inventory"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + waitForSinkSize("sink", snapshotSize + 7); + + /* + *
+         * The final database table looks like this:
+         *
+         * > SELECT * FROM products;
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * | id  | name               | description                                             | weight |
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * | 101 | scooter            | Small 2-wheel scooter                                   |   3.14 |
+         * | 102 | car battery        | 12V car battery                                         |    8.1 |
+         * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |    0.8 |
+         * | 104 | hammer             | 12oz carpenter's hammer                                 |   0.75 |
+         * | 105 | hammer             | 14oz carpenter's hammer                                 |  0.875 |
+         * | 106 | hammer             | 18oz carpenter hammer                                   |      1 |
+         * | 107 | rocks              | box of assorted rocks                                   |    5.1 |
+         * | 108 | jacket             | water resistent black wind breaker                      |    0.1 |
+         * | 109 | spare tire         | 24 inch spare tire                                      |   22.2 |
+         * | 110 | jacket             | new water resistent white wind breaker                  |    0.5 |
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * 
+ */ + + List expected = + Arrays.asList( + "+I(101,scooter,Small 2-wheel scooter,3.1400000000)", + "+I(102,car battery,12V car battery,8.1000000000)", + "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)", + "+I(104,hammer,12oz carpenter's hammer,0.7500000000)", + "+I(105,hammer,14oz carpenter's hammer,0.8750000000)", + "+I(106,hammer,16oz carpenter's hammer,1.0000000000)", + "+I(107,rocks,box of assorted rocks,5.3000000000)", + "+I(108,jacket,water resistent black wind breaker,0.1000000000)", + "+I(109,spare tire,24 inch spare tire,22.2000000000)", + "+U(106,hammer,18oz carpenter hammer,1.0000000000)", + "+U(107,rocks,box of assorted rocks,5.1000000000)", + "+I(110,jacket,water resistent white wind breaker,0.2000000000)", + "+I(111,scooter,Big 2-wheel scooter ,5.1800000000)", + "+U(110,jacket,new water resistent white wind breaker,0.5000000000)", + "+U(111,scooter,Big 2-wheel scooter ,5.1700000000)", + "-D(111,scooter,Big 2-wheel scooter ,5.1700000000)"); + List actual = TestValuesTableFactory.getRawResults("sink"); + assertContainsInAnyOrder(expected, actual); + + result.getJobClient().get().cancel().get(); + } + + @Test + public void testMetadataColumns() throws Exception { + initializeTable("inventory_meta"); + + String sourceDDL = + String.format( + "CREATE TABLE ob_source (" + + " tenant STRING METADATA FROM 'tenant_name' VIRTUAL," + + " database STRING METADATA FROM 'database_name' VIRTUAL," + + " `table` STRING METADATA FROM 'table_name' VIRTUAL," + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'oceanbase-cdc'," + + " 'scan.startup.mode' = 'initial'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'tenant-name' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'rootserver-list' = '%s'," + + " 'logproxy.host' = '%s'," + + " 'logproxy.port' = '%s'" + + ")", + OB_SYS_USERNAME, + OB_SYS_PASSWORD, + "sys", + "inventory_meta", + "products", + OB_WITH_LOG_PROXY.getContainerIpAddress(), + OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT), + "127.0.0.1:2882:2881", + OB_WITH_LOG_PROXY.getContainerIpAddress(), + OB_WITH_LOG_PROXY.getMappedPort(OB_LOG_PROXY_PORT)); + + String sinkDDL = + "CREATE TABLE sink (" + + " tenant STRING," + + " database STRING," + + " `table` STRING," + + " `id` DECIMAL(20, 0) NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " primary key (tenant, database, `table`, `id`) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source"); + + waitForSinkSize("sink", 9); + int snapshotSize = sinkSize("sink"); + + try (Connection connection = getJdbcConnection("inventory_meta"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + } + + waitForSinkSize("sink", snapshotSize + 1); + + List expected = + Arrays.asList( + "+I(sys,inventory_meta,products,101,scooter,Small 2-wheel scooter,3.1400000000)", + "+I(sys,inventory_meta,products,102,car battery,12V car battery,8.1000000000)", + "+I(sys,inventory_meta,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)", + "+I(sys,inventory_meta,products,104,hammer,12oz carpenter's hammer,0.7500000000)", + "+I(sys,inventory_meta,products,105,hammer,14oz carpenter's hammer,0.8750000000)", + "+I(sys,inventory_meta,products,106,hammer,16oz carpenter's hammer,1.0000000000)", + "+I(sys,inventory_meta,products,107,rocks,box of assorted rocks,5.3000000000)", + "+I(sys,inventory_meta,products,108,jacket,water resistent black wind breaker,0.1000000000)", + "+I(sys,inventory_meta,products,109,spare tire,24 inch spare tire,22.2000000000)", + "+U(sys,inventory_meta,products,106,hammer,18oz carpenter hammer,1.0000000000)"); + List actual = TestValuesTableFactory.getRawResults("sink"); + assertContainsInAnyOrder(expected, actual); + result.getJobClient().get().cancel().get(); + } + + @Test + public void testAllDataTypes() throws Exception { + ZoneId serverTimeZone = ZoneId.systemDefault(); + ZoneOffset zoneOffset = serverTimeZone.getRules().getOffset(Instant.now()); + try (Connection connection = getJdbcConnection(""); + Statement statement = connection.createStatement()) { + statement.execute(String.format("SET GLOBAL time_zone = '%s';", zoneOffset.getId())); + } + tEnv.getConfig().setLocalTimeZone(serverTimeZone); + initializeTable("column_type_test"); + String sourceDDL = + String.format( + "CREATE TABLE ob_source (\n" + + " `id` INT NOT NULL,\n" + + " bool_c TINYINT,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT,\n" + + " small_c SMALLINT ,\n" + + " small_un_c INT ,\n" + + " medium_c INT,\n" + + " medium_un_c INT,\n" + + " int_c INT,\n" + + " int_un_c BIGINT,\n" + + " big_c BIGINT,\n" + + " big_un_c DECIMAL(20, 0),\n" + + " real_c FLOAT,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " big_decimal_c STRING,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP,\n" + + " char_c CHAR(3),\n" + + " varchar_c VARCHAR(255),\n" + + " bit_c BINARY(8),\n" + + " text_c STRING,\n" + + " tiny_blob_c BYTES,\n" + + " medium_blob_c BYTES,\n" + + " long_blob_c BYTES,\n" + + " blob_c BYTES,\n" + + " year_c INT,\n" + + " set_c STRING,\n" + + " enum_c STRING,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'oceanbase-cdc'," + + " 'scan.startup.mode' = 'initial'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'tenant-name' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'rootserver-list' = '%s'," + + " 'logproxy.host' = '%s'," + + " 'logproxy.port' = '%s'," + + " 'server-time-zone' = '%s'" + + ")", + OB_SYS_USERNAME, + OB_SYS_PASSWORD, + "sys", + "column_type_test", + "full_types", + OB_WITH_LOG_PROXY.getContainerIpAddress(), + OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT), + "127.0.0.1:2882:2881", + OB_WITH_LOG_PROXY.getContainerIpAddress(), + OB_WITH_LOG_PROXY.getMappedPort(OB_LOG_PROXY_PORT), + serverTimeZone); + String sinkDDL = + "CREATE TABLE sink (" + + " `id` INT NOT NULL,\n" + + " bool_c TINYINT,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT,\n" + + " small_c SMALLINT ,\n" + + " small_un_c INT ,\n" + + " medium_c INT,\n" + + " medium_un_c INT,\n" + + " int_c INT,\n" + + " int_un_c BIGINT,\n" + + " big_c BIGINT,\n" + + " big_un_c DECIMAL(20, 0),\n" + + " real_c FLOAT,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " big_decimal_c STRING,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP,\n" + + " char_c CHAR(3),\n" + + " varchar_c VARCHAR(255),\n" + + " bit_c BINARY(8),\n" + + " text_c STRING,\n" + + " tiny_blob_c BYTES,\n" + + " medium_blob_c BYTES,\n" + + " blob_c BYTES,\n" + + " long_blob_c BYTES,\n" + + " year_c INT,\n" + + " enum_c STRING,\n" + + " set_c STRING,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '3'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source"); + + waitForSinkSize("sink", 1); + int snapshotSize = sinkSize("sink"); + + try (Connection connection = getJdbcConnection("column_type_test"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); + } + + waitForSinkSize("sink", snapshotSize + 1); + + List expected = + Arrays.asList( + "+I(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)", + "+U(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)"); + + List actual = TestValuesTableFactory.getRawResults("sink"); + assertContainsInAnyOrder(expected, actual); + result.getJobClient().get().cancel().get(); + } + + private static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + public static void assertContainsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertTrue(actual.containsAll(expected)); + } +} diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java new file mode 100644 index 000000000..55e7bc868 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase.table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Test for {@link OceanBaseTableSource} created by {@link OceanBaseTableSourceFactory}. */ +public class OceanBaseTableFactoryTest { + + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3))), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("aaa"))); + + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3)), + Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), + Column.metadata("tenant", DataTypes.STRING(), "tenant_name", true), + Column.metadata("database", DataTypes.STRING(), "database_name", true), + Column.metadata("table", DataTypes.STRING(), "table_name", true)), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("aaa"))); + + private static final String STARTUP_MODE = "latest-offset"; + private static final String USERNAME = "user@sys"; + private static final String PASSWORD = "pswd"; + private static final String TENANT_NAME = "sys"; + private static final String DATABASE_NAME = "db"; + private static final String TABLE_NAME = "table"; + private static final String RS_LIST = "127.0.0.1:2882:2881"; + private static final String LOG_PROXY_HOST = "127.0.0.1"; + private static final String LOG_PROXY_PORT = "2983"; + + @Test + public void testCommonProperties() { + Map properties = getRequiredOptions(); + + DynamicTableSource actualSource = createTableSource(SCHEMA, properties); + OceanBaseTableSource expectedSource = + new OceanBaseTableSource( + SCHEMA, + StartupMode.LATEST_OFFSET, + null, + USERNAME, + PASSWORD, + TENANT_NAME, + DATABASE_NAME, + TABLE_NAME, + null, + null, + Duration.ofSeconds(30), + ZoneId.of("UTC"), + RS_LIST, + LOG_PROXY_HOST, + 2983); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testOptionalProperties() { + Map options = getRequiredOptions(); + options.put("scan.startup.mode", "timestamp"); + options.put("scan.startup.timestamp", "0"); + options.put("hostname", "127.0.0.1"); + options.put("port", "2881"); + DynamicTableSource actualSource = createTableSource(SCHEMA, options); + + OceanBaseTableSource expectedSource = + new OceanBaseTableSource( + SCHEMA, + StartupMode.TIMESTAMP, + 0L, + USERNAME, + PASSWORD, + TENANT_NAME, + DATABASE_NAME, + TABLE_NAME, + "127.0.0.1", + 2881, + Duration.ofSeconds(30), + ZoneId.of("UTC"), + RS_LIST, + LOG_PROXY_HOST, + 2983); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testMetadataColumns() { + Map properties = getRequiredOptions(); + + DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); + OceanBaseTableSource oceanBaseTableSource = (OceanBaseTableSource) actualSource; + oceanBaseTableSource.applyReadableMetadata( + Arrays.asList("op_ts", "tenant_name", "database_name", "table_name"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + actualSource = oceanBaseTableSource.copy(); + + OceanBaseTableSource expectedSource = + new OceanBaseTableSource( + SCHEMA_WITH_METADATA, + StartupMode.LATEST_OFFSET, + null, + USERNAME, + PASSWORD, + TENANT_NAME, + DATABASE_NAME, + TABLE_NAME, + null, + null, + Duration.ofSeconds(30), + ZoneId.of("UTC"), + RS_LIST, + LOG_PROXY_HOST, + 2983); + expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedSource.metadataKeys = + Arrays.asList("op_ts", "tenant_name", "database_name", "table_name"); + + assertEquals(expectedSource, actualSource); + } + + @Test + public void testValidation() { + try { + Map properties = getRequiredOptions(); + properties.put("unknown", "abc"); + + createTableSource(SCHEMA, properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown") + .isPresent()); + } + } + + private Map getRequiredOptions() { + Map options = new HashMap<>(); + options.put("connector", "oceanbase-cdc"); + options.put("scan.startup.mode", STARTUP_MODE); + options.put("username", USERNAME); + options.put("password", PASSWORD); + options.put("tenant-name", TENANT_NAME); + options.put("database-name", DATABASE_NAME); + options.put("table-name", TABLE_NAME); + options.put("rootserver-list", RS_LIST); + options.put("logproxy.host", LOG_PROXY_HOST); + options.put("logproxy.port", LOG_PROXY_PORT); + return options; + } + + private static DynamicTableSource createTableSource( + ResolvedSchema schema, Map options) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("default", "default", "t1"), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(schema).build(), + "mock source", + new ArrayList<>(), + options), + schema), + new Configuration(), + OceanBaseTableFactoryTest.class.getClassLoader(), + false); + } +} diff --git a/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql b/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql new file mode 100644 index 000000000..bf33870be --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/resources/ddl/column_type_test.sql @@ -0,0 +1,67 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE DATABASE column_type_test; +USE column_type_test; + +CREATE TABLE full_types +( + id INT AUTO_INCREMENT NOT NULL, + bool_c BOOLEAN, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED, + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + medium_c MEDIUMINT, + medium_un_c MEDIUMINT UNSIGNED, + int_c INTEGER, + int_un_c INTEGER UNSIGNED, + big_c BIGINT, + big_un_c BIGINT UNSIGNED, + real_c REAL, + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + big_decimal_c DECIMAL(65, 1), + date_c DATE, + time_c TIME(0), + datetime3_c TIMESTAMP(3), + datetime6_c TIMESTAMP(6), + timestamp_c TIMESTAMP, + char_c CHAR(3), + varchar_c VARCHAR(255), + bit_c BIT(64), + text_c TEXT, + tiny_blob_c TINYBLOB, + medium_blob_c MEDIUMBLOB, + blob_c BLOB, + long_blob_c LONGBLOB, + year_c YEAR, + set_c SET ('a', 'b'), + enum_c ENUM ('red', 'green', 'blue'), + PRIMARY KEY (id) +) DEFAULT CHARSET = utf8mb4; + +INSERT INTO full_types +VALUES (DEFAULT, true, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 9223372036854775807, + 18446744073709551615, 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, '2020-07-17', '18:00:22', + '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', 'abc', 'Hello World', + b'0000010000000100000001000000010000000100000001000000010000000100', 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), + UNHEX(HEX(16)), UNHEX(HEX(16)), 2022, 'a', 'red'); diff --git a/flink-connector-oceanbase-cdc/src/test/resources/ddl/inventory.sql b/flink-connector-oceanbase-cdc/src/test/resources/ddl/inventory.sql new file mode 100644 index 000000000..cd96f9186 --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/resources/ddl/inventory.sql @@ -0,0 +1,42 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE DATABASE inventory; +USE inventory; + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight DECIMAL(20, 10) +); +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (default, "scooter", "Small 2-wheel scooter", 3.14), + (default, "car battery", "12V car battery", 8.1), + (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8), + (default, "hammer", "12oz carpenter's hammer", 0.75), + (default, "hammer", "14oz carpenter's hammer", 0.875), + (default, "hammer", "16oz carpenter's hammer", 1.0), + (default, "rocks", "box of assorted rocks", 5.3), + (default, "jacket", "water resistent black wind breaker", 0.1), + (default, "spare tire", "24 inch spare tire", 22.2); diff --git a/flink-connector-oceanbase-cdc/src/test/resources/ddl/inventory_meta.sql b/flink-connector-oceanbase-cdc/src/test/resources/ddl/inventory_meta.sql new file mode 100644 index 000000000..a53f3e61a --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/resources/ddl/inventory_meta.sql @@ -0,0 +1,42 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory_meta +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE DATABASE inventory_meta; +USE inventory_meta; + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight DECIMAL(20, 10) +); +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (default, "scooter", "Small 2-wheel scooter", 3.14), + (default, "car battery", "12V car battery", 8.1), + (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8), + (default, "hammer", "12oz carpenter's hammer", 0.75), + (default, "hammer", "14oz carpenter's hammer", 0.875), + (default, "hammer", "16oz carpenter's hammer", 1.0), + (default, "rocks", "box of assorted rocks", 5.3), + (default, "jacket", "water resistent black wind breaker", 0.1), + (default, "spare tire", "24 inch spare tire", 22.2); diff --git a/flink-connector-oceanbase-cdc/src/test/resources/log4j2-test.properties b/flink-connector-oceanbase-cdc/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..b82a9606d --- /dev/null +++ b/flink-connector-oceanbase-cdc/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-sql-connector-oceanbase-cdc/pom.xml b/flink-sql-connector-oceanbase-cdc/pom.xml new file mode 100644 index 000000000..5fc948a8f --- /dev/null +++ b/flink-sql-connector-oceanbase-cdc/pom.xml @@ -0,0 +1,121 @@ + + + + + flink-cdc-connectors + com.ververica + 2.2-SNAPSHOT + + 4.0.0 + + flink-sql-connector-oceanbase-cdc + + + + com.ververica + flink-connector-oceanbase-cdc + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + shade-flink + package + + shade + + + false + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + com.ververica:flink-connector-debezium + com.ververica:flink-connector-oceanbase-cdc + mysql:mysql-connector-java + com.oceanbase.logclient:* + io.netty:netty-all + com.google.protobuf:protobuf-java + commons-codec:commons-codec + org.lz4:lz4-java + org.apache.avro:avro + org.apache.commons:* + org.apache.kafka:* + com.fasterxml.*:* + com.google.guava:* + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka:* + + kafka/kafka-version.properties + LICENSE + + NOTICE + common/** + + + + + + org.apache.kafka + + com.ververica.cdc.connectors.shaded.org.apache.kafka + + + + org.apache.avro + + com.ververica.cdc.connectors.shaded.org.apache.avro + + + + com.fasterxml + + com.ververica.cdc.connectors.shaded.com.fasterxml + + + + com.google + + com.ververica.cdc.connectors.shaded.com.google + + + + + + + + + + diff --git a/flink-sql-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/DummyDocs.java b/flink-sql-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/DummyDocs.java new file mode 100644 index 000000000..ae8ed7569 --- /dev/null +++ b/flink-sql-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/DummyDocs.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oceanbase; + +/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */ +public class DummyDocs {} diff --git a/flink-sql-connector-oceanbase-cdc/src/main/resources/META-INF/NOTICE b/flink-sql-connector-oceanbase-cdc/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000..3b14d567b --- /dev/null +++ b/flink-sql-connector-oceanbase-cdc/src/main/resources/META-INF/NOTICE @@ -0,0 +1,6 @@ +flink-sql-connector-oceanbase-cdc +Copyright 2020 Ververica Inc. + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- org.apache.kafka:kafka-clients:2.7.0 diff --git a/pom.xml b/pom.xml index 088b66a7b..ca7f081a6 100644 --- a/pom.xml +++ b/pom.xml @@ -39,12 +39,14 @@ under the License. flink-connector-postgres-cdc flink-connector-oracle-cdc flink-connector-mongodb-cdc + flink-connector-oceanbase-cdc flink-connector-sqlserver-cdc flink-connector-tidb-cdc flink-sql-connector-mysql-cdc flink-sql-connector-postgres-cdc flink-sql-connector-mongodb-cdc flink-sql-connector-oracle-cdc + flink-sql-connector-oceanbase-cdc flink-sql-connector-sqlserver-cdc flink-sql-connector-tidb-cdc flink-cdc-e2e-tests @@ -88,6 +90,7 @@ under the License. 1.7.15 2.17.1 2.4.2 + 1.0.2 1 true