[oceanbase] Add OceanBase CDC connector

This closes #778.
pull/959/head
He Wang 3 years ago committed by Leonard Xu
parent 196f9a4fca
commit 28b1b07661

@ -0,0 +1,149 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<name>flink-connector-oceanbase-cdc</name>
<packaging>jar</packaging>
<dependencies>
<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>kafka-log4j-appender</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- OceanBase Log Client -->
<dependency>
<groupId>com.oceanbase.logclient</groupId>
<artifactId>logproxy-client</artifactId>
<version>${oblogclient.version}</version>
<exclusions>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- The MySQL JDBC driver for reading snapshot-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- test dependencies on Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dns-cache-manipulator</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import java.time.Duration;
import java.time.ZoneId;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A builder to build a SourceFunction which can read snapshot and continue to consume commit log.
*/
@PublicEvolving
public class OceanBaseSource {
public static <T> Builder<T> builder() {
return new Builder<>();
}
/** Builder class of {@link OceanBaseSource}. */
public static class Builder<T> {
private StartupMode startupMode;
private Long startupTimestamp;
private String username;
private String password;
private String tenantName;
private String databaseName;
private String tableName;
private String hostname;
private Integer port;
private Duration connectTimeout;
private String rsList;
private String logProxyHost;
private Integer logProxyPort;
private ZoneId serverTimeZone = ZoneId.of("UTC");
private DebeziumDeserializationSchema<T> deserializer;
public Builder<T> startupMode(StartupMode startupMode) {
this.startupMode = startupMode;
return this;
}
public Builder<T> startupTimestamp(Long startupTimestamp) {
this.startupTimestamp = startupTimestamp;
return this;
}
public Builder<T> username(String username) {
this.username = username;
return this;
}
public Builder<T> password(String password) {
this.password = password;
return this;
}
public Builder<T> tenantName(String tenantName) {
this.tenantName = tenantName;
return this;
}
public Builder<T> databaseName(String databaseName) {
this.databaseName = databaseName;
return this;
}
public Builder<T> tableName(String tableName) {
this.tableName = tableName;
return this;
}
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
return this;
}
public Builder<T> port(int port) {
this.port = port;
return this;
}
public Builder<T> connectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
public Builder<T> rsList(String rsList) {
this.rsList = rsList;
return this;
}
public Builder<T> logProxyHost(String logProxyHost) {
this.logProxyHost = logProxyHost;
return this;
}
public Builder<T> logProxyPort(int logProxyPort) {
this.logProxyPort = logProxyPort;
return this;
}
public Builder<T> serverTimeZone(ZoneId serverTimeZone) {
this.serverTimeZone = serverTimeZone;
return this;
}
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}
public SourceFunction<T> build() {
switch (startupMode) {
case INITIAL:
case LATEST_OFFSET:
startupTimestamp = 0L;
break;
case TIMESTAMP:
checkNotNull(startupTimestamp, "startupTimestamp shouldn't be null");
break;
default:
throw new UnsupportedOperationException(
startupMode + " mode is not supported.");
}
return new OceanBaseRichSourceFunction<T>(
startupMode.equals(StartupMode.INITIAL),
checkNotNull(startupTimestamp),
checkNotNull(username),
checkNotNull(password),
checkNotNull(tenantName),
checkNotNull(databaseName),
checkNotNull(tableName),
hostname,
port,
connectTimeout,
checkNotNull(rsList),
checkNotNull(logProxyHost),
checkNotNull(logProxyPort),
checkNotNull(serverTimeZone),
checkNotNull(deserializer));
}
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import java.time.Duration;
/** {@link JdbcConnection} extension to be used with OceanBase server. */
public class OceanBaseConnection extends JdbcConnection {
protected static final String URL_PATTERN =
"jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull&connectTimeout=${connectTimeout}";
protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver";
public OceanBaseConnection(
String hostname,
Integer port,
String user,
String password,
Duration timeout,
ClassLoader classLoader) {
super(config(hostname, port, user, password, timeout), factory(classLoader));
}
public static Configuration config(
String hostname, Integer port, String user, String password, Duration timeout) {
return Configuration.create()
.with("hostname", hostname)
.with("port", port)
.with("user", user)
.with("password", password)
.with("connectTimeout", timeout == null ? 30000 : timeout.toMillis())
.build();
}
public static JdbcConnection.ConnectionFactory factory(ClassLoader classLoader) {
return JdbcConnection.patternBasedFactory(URL_PATTERN, DRIVER_CLASS_NAME, classLoader);
}
}

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import com.oceanbase.oms.logmessage.ByteString;
import com.oceanbase.oms.logmessage.DataMessage;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ValueConverterProvider;
import io.debezium.util.NumberConversions;
import org.apache.kafka.connect.data.Schema;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.ZoneOffset;
import java.util.Arrays;
/** Utils to convert jdbc type and value of a field. */
public class OceanBaseJdbcConverter {
public static ValueConverterProvider valueConverterProvider(ZoneOffset zoneOffset) {
return new JdbcValueConverters(
JdbcValueConverters.DecimalMode.STRING,
TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS,
zoneOffset,
null,
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
CommonConnectorConfig.BinaryHandlingMode.BYTES);
}
public static Object getField(int jdbcType, Object value) {
if (value == null) {
return null;
}
jdbcType = getType(jdbcType, null);
switch (jdbcType) {
case Types.BIT:
if (value instanceof Boolean) {
return new byte[] {NumberConversions.getByte((Boolean) value)};
}
return value;
case Types.INTEGER:
if (value instanceof Boolean) {
return NumberConversions.getInteger((Boolean) value);
}
if (value instanceof Date) {
return ((Date) value).getYear() + 1900;
}
return value;
case Types.FLOAT:
Float f = (Float) value;
return f.doubleValue();
case Types.DECIMAL:
if (value instanceof BigInteger) {
return value.toString();
}
BigDecimal decimal = (BigDecimal) value;
return decimal.toString();
case Types.DATE:
Date date = (Date) value;
return io.debezium.time.Date.toEpochDay(date, null);
case Types.TIME:
Time time = (Time) value;
return io.debezium.time.MicroTime.toMicroOfDay(time, true);
case Types.TIMESTAMP:
Timestamp timestamp = (Timestamp) value;
return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null);
default:
return value;
}
}
public static Object getField(
Schema.Type schemaType, DataMessage.Record.Field.Type fieldType, ByteString value) {
if (value == null) {
return null;
}
int jdbcType = getType(fieldType);
switch (jdbcType) {
case Types.NULL:
return null;
case Types.INTEGER:
if (schemaType.equals(Schema.Type.INT64)) {
return Long.parseLong(value.toString());
}
return Integer.parseInt(value.toString());
case Types.BIGINT:
if (schemaType.equals(Schema.Type.STRING)) {
return value.toString();
}
return Long.parseLong(value.toString());
case Types.DOUBLE:
return Double.parseDouble(value.toString());
case Types.DATE:
Date date = Date.valueOf(value.toString());
return io.debezium.time.Date.toEpochDay(date, null);
case Types.TIME:
Time time = Time.valueOf(value.toString());
return io.debezium.time.MicroTime.toMicroOfDay(time, true);
case Types.TIMESTAMP:
Timestamp timestamp = Timestamp.valueOf(value.toString());
return io.debezium.time.MicroTimestamp.toEpochMicros(timestamp, null);
case Types.BIT:
long v = Long.parseLong(value.toString());
byte[] bytes = ByteBuffer.allocate(8).putLong(v).array();
int i = 0;
while (bytes[i] == 0 && i < Long.BYTES - 1) {
i++;
}
return Arrays.copyOfRange(bytes, i, Long.BYTES);
case Types.BINARY:
return ByteBuffer.wrap(value.toString().getBytes(StandardCharsets.UTF_8));
default:
return value.toString();
}
}
private static boolean isBoolean(int jdbcType, String typeName) {
return jdbcType == Types.BOOLEAN || (jdbcType == Types.BIT && "TINYINT".equals(typeName));
}
public static int getType(int jdbcType, String typeName) {
// treat boolean as tinyint type
if (isBoolean(jdbcType, typeName)) {
jdbcType = Types.TINYINT;
}
// treat year as int type
if ("YEAR".equals(typeName)) {
jdbcType = Types.INTEGER;
}
// upcasting
if ("INT UNSIGNED".equals(typeName)) {
jdbcType = Types.BIGINT;
}
if ("BIGINT UNSIGNED".equals(typeName)) {
jdbcType = Types.DECIMAL;
}
// widening conversion according to com.mysql.jdbc.ResultSetImpl#getObject
switch (jdbcType) {
case Types.TINYINT:
case Types.SMALLINT:
return Types.INTEGER;
case Types.REAL:
return Types.FLOAT;
default:
return jdbcType;
}
}
public static int getType(DataMessage.Record.Field.Type fieldType) {
switch (fieldType) {
case NULL:
return Types.NULL;
case INT8:
case INT16:
case INT24:
case INT32:
case YEAR:
return Types.INTEGER;
case INT64:
return Types.BIGINT;
case FLOAT:
case DOUBLE:
return Types.DOUBLE;
case DECIMAL:
return Types.DECIMAL;
case ENUM:
case SET:
case STRING:
case JSON:
return Types.CHAR;
case TIMESTAMP:
case DATETIME:
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_NANO:
return Types.TIMESTAMP;
case DATE:
return Types.DATE;
case TIME:
return Types.TIME;
case BIT:
return Types.BIT;
case BLOB:
case BINARY:
return Types.BINARY;
case INTERVAL_YEAR_TO_MONTH:
case INTERVAL_DAY_TO_SECOND:
case GEOMETRY:
case RAW:
// it's weird to get wrong type from TEXT column, temporarily treat it as a string
case UNKOWN:
default:
return Types.VARCHAR;
}
}
}

@ -0,0 +1,564 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import com.mysql.jdbc.ResultSetMetaData;
import com.oceanbase.clogproxy.client.LogProxyClient;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.LogMessage;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.relational.TableSchema;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* The source implementation for OceanBase that read snapshot events first and then read the change
* event.
*
* @param <T> The type created by the deserializer.
*/
public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointListener, CheckpointedFunction, ResultTypeQueryable<T> {
private static final long serialVersionUID = 2844054619864617340L;
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseRichSourceFunction.class);
private final boolean snapshot;
private final String username;
private final String password;
private final String tenantName;
private final String databaseName;
private final String tableName;
private final String hostname;
private final Integer port;
private final Duration connectTimeout;
private final String rsList;
private final String logProxyHost;
private final int logProxyPort;
private final long startTimestamp;
private final ZoneOffset zoneOffset;
private final DebeziumDeserializationSchema<T> deserializer;
private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false);
private final List<LogMessage> logMessageBuffer = new LinkedList<>();
private transient Map<String, TableSchema> tableSchemaMap;
private transient volatile long resolvedTimestamp;
private transient volatile OceanBaseConnection snapshotConnection;
private transient LogProxyClient logProxyClient;
private transient ListState<Long> offsetState;
private transient OutputCollector<T> outputCollector;
public OceanBaseRichSourceFunction(
boolean snapshot,
long startTimestamp,
String username,
String password,
String tenantName,
String databaseName,
String tableName,
String hostname,
Integer port,
Duration connectTimeout,
String rsList,
String logProxyHost,
int logProxyPort,
ZoneId serverTimeZone,
DebeziumDeserializationSchema<T> deserializer) {
this.snapshot = snapshot;
this.username = username;
this.password = password;
this.tenantName = tenantName;
this.databaseName = databaseName;
this.tableName = tableName;
this.hostname = hostname;
this.port = port;
this.connectTimeout = connectTimeout;
this.rsList = rsList;
this.logProxyHost = logProxyHost;
this.logProxyPort = logProxyPort;
this.startTimestamp = startTimestamp;
this.zoneOffset = serverTimeZone.getRules().getOffset(Instant.now());
this.deserializer = deserializer;
}
@Override
public void open(final Configuration config) throws Exception {
super.open(config);
this.outputCollector = new OutputCollector<>();
this.tableSchemaMap = new ConcurrentHashMap<>();
this.resolvedTimestamp = -1;
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
outputCollector.context = ctx;
LOG.info("Start readChangeEvents process");
readChangeEvents();
if (shouldReadSnapshot()) {
synchronized (ctx.getCheckpointLock()) {
try {
if (snapshotConnection == null) {
snapshotConnection =
new OceanBaseConnection(
hostname,
port,
username,
password,
connectTimeout,
getClass().getClassLoader());
}
readSnapshot();
} finally {
if (snapshotConnection != null) {
snapshotConnection.close();
}
}
LOG.info("Snapshot reading finished");
}
} else {
LOG.info("Skip snapshot read");
}
logProxyClient.join();
}
protected void readSnapshot() {
final Map<String, String> tableMap = new HashMap<>();
try {
String sql =
String.format(
"SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+ "WHERE TABLE_TYPE='BASE TABLE' and TABLE_SCHEMA REGEXP '%s' and TABLE_NAME REGEXP '%s'",
databaseName, tableName);
snapshotConnection.query(
sql,
rs -> {
while (rs.next()) {
tableMap.put(rs.getString(1), rs.getString(2));
}
});
} catch (SQLException e) {
LOG.error("Query database and table name failed", e);
throw new FlinkRuntimeException(e);
}
tableMap.forEach(this::readSnapshotFromTable);
snapshotCompleted.set(true);
}
private void readSnapshotFromTable(String databaseName, String tableName) {
// TODO make topic name configurable
String topicName = getDefaultTopicName(tenantName, databaseName, tableName);
Map<String, String> partition = getSourcePartition(tenantName, databaseName, tableName);
// the offset here is useless
Map<String, Object> offset = getSourceOffset(resolvedTimestamp);
String fullName = String.format("`%s`.`%s`", databaseName, tableName);
String selectSql = "SELECT * FROM " + fullName;
try {
snapshotConnection.query(
selectSql,
rs -> {
ResultSetMetaData metaData = (ResultSetMetaData) rs.getMetaData();
String[] columnNames = new String[metaData.getColumnCount()];
int[] jdbcTypes = new int[metaData.getColumnCount()];
for (int i = 0; i < metaData.getColumnCount(); i++) {
columnNames[i] = metaData.getColumnName(i + 1);
jdbcTypes[i] =
OceanBaseJdbcConverter.getType(
metaData.getColumnType(i + 1),
metaData.getColumnTypeName(i + 1));
}
TableSchema tableSchema = tableSchemaMap.get(topicName);
if (tableSchema == null) {
tableSchema =
OceanBaseTableSchema.getTableSchema(
topicName,
databaseName,
tableName,
columnNames,
jdbcTypes,
zoneOffset);
tableSchemaMap.put(topicName, tableSchema);
}
Struct source =
OceanBaseSchemaUtils.sourceStruct(
tenantName, databaseName, tableName, null, null);
while (rs.next()) {
Struct value = new Struct(tableSchema.valueSchema());
for (int i = 0; i < metaData.getColumnCount(); i++) {
value.put(
columnNames[i],
OceanBaseJdbcConverter.getField(
jdbcTypes[i], rs.getObject(i + 1)));
}
Struct struct =
tableSchema.getEnvelopeSchema().create(value, source, null);
try {
deserializer.deserialize(
new SourceRecord(
partition,
offset,
topicName,
null,
null,
null,
struct.schema(),
struct),
outputCollector);
} catch (Exception e) {
LOG.error("Deserialize snapshot record failed ", e);
throw new FlinkRuntimeException(e);
}
}
});
} catch (SQLException e) {
LOG.error("Read snapshot from table " + fullName + " failed", e);
throw new FlinkRuntimeException(e);
}
}
protected void readChangeEvents() throws InterruptedException {
String tableWhiteList = String.format("%s.%s.%s", tenantName, databaseName, tableName);
ObReaderConfig obReaderConfig = new ObReaderConfig();
obReaderConfig.setRsList(rsList);
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setTableWhiteList(tableWhiteList);
if (resolvedTimestamp > 0) {
obReaderConfig.setStartTimestamp(resolvedTimestamp);
LOG.info("Read change events from resolvedTimestamp: {}", resolvedTimestamp);
} else {
obReaderConfig.setStartTimestamp(startTimestamp);
LOG.info("Read change events from startTimestamp: {}", startTimestamp);
}
final CountDownLatch latch = new CountDownLatch(1);
// avoid client id duplication when starting multiple connectors in one etl
ClientConf.USER_DEFINED_CLIENTID = ClientIdGenerator.generate() + tableWhiteList;
logProxyClient = new LogProxyClient(logProxyHost, logProxyPort, obReaderConfig);
logProxyClient.addListener(
new RecordListener() {
boolean started = false;
@Override
public void notify(LogMessage message) {
switch (message.getOpt()) {
case HEARTBEAT:
case BEGIN:
if (!started) {
started = true;
latch.countDown();
}
break;
case INSERT:
case UPDATE:
case DELETE:
if (!started) {
break;
}
logMessageBuffer.add(message);
break;
case COMMIT:
// flush buffer after snapshot completed
if (!shouldReadSnapshot() || snapshotCompleted.get()) {
logMessageBuffer.forEach(
msg -> {
try {
deserializer.deserialize(
getRecordFromLogMessage(msg),
outputCollector);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
});
logMessageBuffer.clear();
resolvedTimestamp = Long.parseLong(message.getTimestamp());
}
break;
case DDL:
// TODO record ddl and remove expired table schema
LOG.trace(
"Ddl: {}",
message.getFieldList().get(0).getValue().toString());
break;
default:
throw new UnsupportedOperationException(
"Unsupported type: " + message.getOpt());
}
}
@Override
public void onException(LogProxyClientException e) {
LOG.error("LogProxyClient exception", e);
logProxyClient.stop();
}
});
logProxyClient.start();
LOG.info("LogProxyClient started");
latch.await();
LOG.info("LogProxyClient packet processing started");
}
private SourceRecord getRecordFromLogMessage(LogMessage message) throws Exception {
String databaseName = message.getDbName().replace(tenantName + ".", "");
// TODO make topic name configurable
String topicName = getDefaultTopicName(tenantName, databaseName, message.getTableName());
if (tableSchemaMap.get(topicName) == null) {
String[] columnNames = new String[message.getFieldCount()];
int[] jdbcTypes = new int[message.getFieldCount()];
int i = 0;
for (DataMessage.Record.Field field : message.getFieldList()) {
if (message.getOpt() == DataMessage.Record.Type.UPDATE && field.isPrev()) {
continue;
}
columnNames[i] = field.getFieldname();
jdbcTypes[i] = OceanBaseJdbcConverter.getType(field.getType());
i++;
}
TableSchema tableSchema =
OceanBaseTableSchema.getTableSchema(
topicName, databaseName, tableName, columnNames, jdbcTypes, zoneOffset);
tableSchemaMap.put(topicName, tableSchema);
}
Struct source =
OceanBaseSchemaUtils.sourceStruct(
tenantName,
databaseName,
message.getTableName(),
message.getTimestamp(),
message.getOB10UniqueId());
Struct struct;
switch (message.getOpt()) {
case INSERT:
Struct after = getLogValueStruct(topicName, message.getFieldList());
struct =
tableSchemaMap
.get(topicName)
.getEnvelopeSchema()
.create(after, source, null);
break;
case UPDATE:
List<DataMessage.Record.Field> beforeFields = new ArrayList<>();
List<DataMessage.Record.Field> afterFields = new ArrayList<>();
for (DataMessage.Record.Field field : message.getFieldList()) {
if (field.isPrev()) {
beforeFields.add(field);
} else {
afterFields.add(field);
}
}
after = getLogValueStruct(topicName, afterFields);
Struct before = getLogValueStruct(topicName, beforeFields);
struct =
tableSchemaMap
.get(topicName)
.getEnvelopeSchema()
.update(before, after, source, null);
break;
case DELETE:
before = getLogValueStruct(topicName, message.getFieldList());
struct =
tableSchemaMap
.get(topicName)
.getEnvelopeSchema()
.delete(before, source, null);
break;
default:
throw new UnsupportedOperationException(
"Unsupported dml type: " + message.getOpt());
}
return new SourceRecord(
getSourcePartition(tenantName, databaseName, message.getTableName()),
getSourceOffset(Long.parseLong(message.getTimestamp())),
topicName,
null,
null,
null,
struct.schema(),
struct);
}
private boolean shouldReadSnapshot() {
return resolvedTimestamp == -1 && snapshot;
}
private String getDefaultTopicName(String tenantName, String databaseName, String tableName) {
return String.format("%s.%s.%s", tenantName, databaseName, tableName);
}
private Map<String, String> getSourcePartition(
String tenantName, String databaseName, String tableName) {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("tenant", tenantName);
sourcePartition.put("database", databaseName);
sourcePartition.put("table", tableName);
return sourcePartition;
}
private Map<String, Object> getSourceOffset(long timestamp) {
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("timestamp", timestamp);
return sourceOffset;
}
private Struct getLogValueStruct(String topicName, List<DataMessage.Record.Field> fieldList) {
TableSchema tableSchema = tableSchemaMap.get(topicName);
Struct value = new Struct(tableSchema.valueSchema());
Object fieldValue;
for (DataMessage.Record.Field field : fieldList) {
try {
Schema fieldSchema = tableSchema.valueSchema().field(field.getFieldname()).schema();
fieldValue =
OceanBaseJdbcConverter.getField(
fieldSchema.type(), field.getType(), field.getValue());
value.put(field.getFieldname(), fieldValue);
} catch (NumberFormatException e) {
tableSchema =
OceanBaseTableSchema.upcastingTableSchema(
topicName,
tableSchema,
fieldList.stream()
.collect(
Collectors.toMap(
DataMessage.Record.Field::getFieldname,
f -> f.getValue().toString())));
tableSchemaMap.put(topicName, tableSchema);
return getLogValueStruct(topicName, fieldList);
}
}
return value;
}
@Override
public void notifyCheckpointComplete(long l) {
// do nothing
}
@Override
public TypeInformation<T> getProducedType() {
return this.deserializer.getProducedType();
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
LOG.info(
"snapshotState checkpoint: {} at resolvedTimestamp: {}",
context.getCheckpointId(),
resolvedTimestamp);
offsetState.clear();
offsetState.add(resolvedTimestamp);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
LOG.info("initialize checkpoint");
offsetState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"resolvedTimestampState", LongSerializer.INSTANCE));
if (context.isRestored()) {
for (final Long offset : offsetState.get()) {
resolvedTimestamp = offset;
LOG.info("Restore State from resolvedTimestamp: {}", resolvedTimestamp);
return;
}
}
}
@Override
public void cancel() {
try {
if (snapshotConnection != null) {
snapshotConnection.close();
}
} catch (SQLException e) {
LOG.error("Failed to close snapshotConnection", e);
}
if (logProxyClient != null) {
logProxyClient.stop();
}
}
private static class OutputCollector<T> implements Collector<T> {
private SourceContext<T> context;
@Override
public void collect(T record) {
context.collect(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
/** Utils to deal with OceanBase SourceRecord schema. */
public class OceanBaseSchemaUtils {
public static Schema sourceSchema() {
return SchemaBuilder.struct()
.field("tenant", Schema.STRING_SCHEMA)
.field("database", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA)
.field("timestamp", Schema.OPTIONAL_STRING_SCHEMA)
.field("unique_id", Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
public static Struct sourceStruct(
String tenant, String database, String table, String timestamp, String uniqueId) {
Struct struct =
new Struct(sourceSchema())
.put("tenant", tenant)
.put("database", database)
.put("table", table);
if (timestamp != null) {
struct.put("timestamp", timestamp);
}
if (uniqueId != null) {
struct.put("unique_id", uniqueId);
}
return struct;
}
}

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.source;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.math.BigInteger;
import java.sql.Types;
import java.time.ZoneOffset;
import java.util.Map;
/** Utils to deal with table schema of OceanBase. */
public class OceanBaseTableSchema {
public static TableSchemaBuilder tableSchemaBuilder(ZoneOffset zoneOffset) {
return new TableSchemaBuilder(
OceanBaseJdbcConverter.valueConverterProvider(zoneOffset),
SchemaNameAdjuster.create(),
new CustomConverterRegistry(null),
OceanBaseSchemaUtils.sourceSchema(),
false);
}
public static TableId tableId(String databaseName, String tableName) {
return new TableId(databaseName, null, tableName);
}
public static Column getColumn(String name, int jdbcType) {
// we can't get the scale and length of decimal, timestamp and bit columns from log,
// so here we set a constant value to these fields to be compatible with the logic of
// JdbcValueConverters#schemaBuilder
ColumnEditor columnEditor =
Column.editor().name(name).jdbcType(jdbcType).optional(true).scale(0);
if (columnEditor.jdbcType() == Types.TIMESTAMP || columnEditor.jdbcType() == Types.BIT) {
columnEditor.length(6);
}
return columnEditor.create();
}
public static TableSchema getTableSchema(
String topicName,
String databaseName,
String tableName,
String[] columnNames,
int[] jdbcTypes,
ZoneOffset zoneOffset) {
TableEditor tableEditor = Table.editor().tableId(tableId(databaseName, tableName));
for (int i = 0; i < columnNames.length; i++) {
tableEditor.addColumn(getColumn(columnNames[i], jdbcTypes[i]));
}
// TODO add column filter and mapper
return tableSchemaBuilder(zoneOffset)
.create(
null,
Envelope.schemaName(topicName),
tableEditor.create(),
null,
null,
null);
}
public static Schema upcastingSchemaType(Schema schema, String value) {
if (schema.type().equals(Schema.Type.INT32) && Long.parseLong(value) > Integer.MAX_VALUE) {
return Schema.INT64_SCHEMA;
}
if (schema.type().equals(Schema.Type.INT64)) {
BigInteger bigInt = new BigInteger(value);
if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
return Schema.STRING_SCHEMA;
}
}
return schema;
}
public static Schema upcastingValueSchema(Schema valueSchema, Map<String, String> fields) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().optional();
for (Map.Entry<String, String> entry : fields.entrySet()) {
Schema fieldSchema = valueSchema.field(entry.getKey()).schema();
fieldSchema = upcastingSchemaType(fieldSchema, entry.getValue());
schemaBuilder.field(entry.getKey(), fieldSchema);
}
return schemaBuilder.build();
}
public static Envelope getEnvelope(String name, Schema valueSchema) {
return Envelope.defineSchema()
.withName(name)
.withRecord(valueSchema)
.withSource(OceanBaseSchemaUtils.sourceSchema())
.build();
}
public static TableSchema upcastingTableSchema(
String topicName, TableSchema tableSchema, Map<String, String> fields) {
Schema valueSchema = upcastingValueSchema(tableSchema.valueSchema(), fields);
return new TableSchema(
tableSchema.id(),
null,
null,
getEnvelope(Envelope.schemaName(topicName), valueSchema),
valueSchema,
null);
}
}

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link OceanBaseTableSource}. */
public enum OceanBaseReadableMetadata {
/** Name of the tenant that contains the row. */
TENANT(
"tenant_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(source.getString("tenant"));
}
}),
/** Name of the database that contains the row. */
DATABASE(
"database_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(source.getString("database"));
}
}),
/** Name of the table that contains the row. */
TABLE(
"table_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(source.getString("table"));
}
}),
/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the change stream, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP_LTZ(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String timestamp = source.getString("timestamp");
if (timestamp == null) {
timestamp = "0";
}
return TimestampData.fromEpochMillis(Long.parseLong(timestamp) * 1000);
}
});
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
OceanBaseReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
public String getKey() {
return key;
}
public DataType getDataType() {
return dataType;
}
public MetadataConverter getConverter() {
return converter;
}
}

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A {@link DynamicTableSource} implementation for OceanBase. */
public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema physicalSchema;
private final StartupMode startupMode;
private final Long startupTimestamp;
private final String username;
private final String password;
private final String tenantName;
private final String databaseName;
private final String tableName;
private final String hostname;
private final Integer port;
private final Duration connectTimeout;
private final ZoneId serverTimeZone;
private final String rsList;
private final String logProxyHost;
private final Integer logProxyPort;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public OceanBaseTableSource(
ResolvedSchema physicalSchema,
StartupMode startupMode,
Long startupTimestamp,
String username,
String password,
String tenantName,
String databaseName,
String tableName,
String hostname,
Integer port,
Duration connectTimeout,
ZoneId serverTimeZone,
String rsList,
String logProxyHost,
int logProxyPort) {
this.physicalSchema = physicalSchema;
this.startupMode = startupMode;
this.startupTimestamp = startupTimestamp;
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.tenantName = checkNotNull(tenantName);
this.databaseName = checkNotNull(databaseName);
this.tableName = checkNotNull(tableName);
this.hostname = hostname;
this.port = port;
this.serverTimeZone = serverTimeZone;
this.connectTimeout = connectTimeout;
this.rsList = checkNotNull(rsList);
this.logProxyHost = checkNotNull(logProxyHost);
this.logProxyPort = logProxyPort;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> resultTypeInfo = context.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(resultTypeInfo)
.setServerTimeZone(serverTimeZone)
.build();
OceanBaseSource.Builder<RowData> builder =
OceanBaseSource.<RowData>builder()
.startupMode(startupMode)
.startupTimestamp(startupTimestamp)
.username(username)
.password(password)
.tenantName(tenantName)
.databaseName(databaseName)
.tableName(tableName)
.hostname(hostname)
.port(port)
.connectTimeout(connectTimeout)
.rsList(rsList)
.logProxyHost(logProxyHost)
.logProxyPort(logProxyPort)
.serverTimeZone(serverTimeZone)
.deserializer(deserializer);
return SourceFunctionProvider.of(builder.build(), false);
}
protected MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(OceanBaseReadableMetadata.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(OceanBaseReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new);
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(OceanBaseReadableMetadata.values())
.collect(
Collectors.toMap(
OceanBaseReadableMetadata::getKey,
OceanBaseReadableMetadata::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
@Override
public DynamicTableSource copy() {
OceanBaseTableSource source =
new OceanBaseTableSource(
physicalSchema,
startupMode,
startupTimestamp,
username,
password,
tenantName,
databaseName,
tableName,
hostname,
port,
connectTimeout,
serverTimeZone,
rsList,
logProxyHost,
logProxyPort);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OceanBaseTableSource that = (OceanBaseTableSource) o;
return Objects.equals(this.physicalSchema, that.physicalSchema)
&& Objects.equals(this.startupMode, that.startupMode)
&& Objects.equals(this.startupTimestamp, that.startupTimestamp)
&& Objects.equals(this.username, that.username)
&& Objects.equals(this.password, that.password)
&& Objects.equals(this.tenantName, that.tenantName)
&& Objects.equals(this.databaseName, that.databaseName)
&& Objects.equals(this.tableName, that.tableName)
&& Objects.equals(this.hostname, that.hostname)
&& Objects.equals(this.port, that.port)
&& Objects.equals(this.connectTimeout, that.connectTimeout)
&& Objects.equals(this.serverTimeZone, that.serverTimeZone)
&& Objects.equals(this.rsList, that.rsList)
&& Objects.equals(this.logProxyHost, that.logProxyHost)
&& Objects.equals(this.logProxyPort, that.logProxyPort)
&& Objects.equals(this.producedDataType, that.producedDataType)
&& Objects.equals(this.metadataKeys, that.metadataKeys);
}
@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
startupMode,
startupTimestamp,
username,
password,
tenantName,
databaseName,
tableName,
hostname,
port,
connectTimeout,
serverTimeZone,
rsList,
logProxyHost,
logProxyPort,
producedDataType,
metadataKeys);
}
@Override
public String asSummaryString() {
return "OceanBase-CDC";
}
}

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
/** Factory for creating configured instance of {@link OceanBaseTableSource}. */
public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "oceanbase-cdc";
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for OceanBase CDC consumer, valid enumerations are "
+ "\"initial\", \"latest-offset\" or \"timestamp\"");
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP =
ConfigOptions.key("scan.startup.timestamp")
.longType()
.noDefaultValue()
.withDescription(
"Optional timestamp in seconds used in case of \"timestamp\" startup mode.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("Username to be used when connecting to OceanBase.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("Password to be used when connecting to OceanBase.");
public static final ConfigOption<String> TENANT_NAME =
ConfigOptions.key("tenant-name")
.stringType()
.noDefaultValue()
.withDescription("Tenant name of OceanBase to monitor.");
public static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of OceanBase to monitor.");
public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of OceanBase to monitor.");
public static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription(
"IP address or hostname of the OceanBase database server or OceanBase proxy server.");
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.noDefaultValue()
.withDescription(
"Integer port number of OceanBase database server or OceanBase proxy server.");
public static final ConfigOption<Duration> CONNECT_TIMEOUT =
ConfigOptions.key("connect.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out.");
public static final ConfigOption<String> SERVER_TIME_ZONE =
ConfigOptions.key("server-time-zone")
.stringType()
.defaultValue("UTC")
.withDescription("The session time zone in database server.");
public static final ConfigOption<String> RS_LIST =
ConfigOptions.key("rootserver-list")
.stringType()
.noDefaultValue()
.withDescription(
"The semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`.");
public static final ConfigOption<String> LOG_PROXY_HOST =
ConfigOptions.key("logproxy.host")
.stringType()
.noDefaultValue()
.withDescription("Hostname or IP address of OceanBase log proxy service.");
public static final ConfigOption<Integer> LOG_PROXY_PORT =
ConfigOptions.key("logproxy.port")
.intType()
.noDefaultValue()
.withDescription("Port number of OceanBase log proxy service.");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
ReadableConfig config = helper.getOptions();
StartupMode startupMode = StartupMode.getStartupMode(config.get(SCAN_STARTUP_MODE));
Long startupTimestamp = config.get(SCAN_STARTUP_TIMESTAMP);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String tenantName = config.get(TENANT_NAME);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String rsList = config.get(RS_LIST);
String logProxyHost = config.get(LOG_PROXY_HOST);
int logProxyPort = config.get(LOG_PROXY_PORT);
String hostname = config.get(HOSTNAME);
Integer port = config.get(PORT);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
return new OceanBaseTableSource(
physicalSchema,
startupMode,
startupTimestamp,
username,
password,
tenantName,
databaseName,
tableName,
hostname,
port,
connectTimeout,
serverTimeZone,
rsList,
logProxyHost,
logProxyPort);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCAN_STARTUP_MODE);
options.add(USERNAME);
options.add(PASSWORD);
options.add(TENANT_NAME);
options.add(DATABASE_NAME);
options.add(TABLE_NAME);
options.add(RS_LIST);
options.add(LOG_PROXY_HOST);
options.add(LOG_PROXY_PORT);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCAN_STARTUP_TIMESTAMP);
options.add(HOSTNAME);
options.add(PORT);
options.add(CONNECT_TIMEOUT);
options.add(SERVER_TIME_ZONE);
return options;
}
}

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.table.api.ValidationException;
/** Startup modes for the OceanBase CDC Consumer. */
public enum StartupMode {
/**
* Performs an initial snapshot on the monitored database tables upon first startup, and
* continue to read the commit log.
*/
INITIAL,
/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the end of the commit log which means only have the changes since the connector was started.
*/
LATEST_OFFSET,
/**
* Never to perform snapshot on the monitored database tables upon first startup, and directly
* read commit log from the specified timestamp.
*/
TIMESTAMP;
public static StartupMode getStartupMode(String modeString) {
switch (modeString.toLowerCase()) {
case "initial":
return INITIAL;
case "latest-offset":
return LATEST_OFFSET;
case "timestamp":
return TIMESTAMP;
default:
throw new ValidationException(
String.format("Invalid startup mode '%s'.", modeString));
}
}
}

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase;
import org.apache.flink.util.TestLogger;
import com.alibaba.dcm.DnsCacheManipulator;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.lifecycle.Startables;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
/** Basic class for testing OceanBase source. */
public class OceanBaseTestBase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
public static final String OB_LOG_PROXY_SERVICE_NAME = "oblogproxy";
// Should be deprecated after official images are released.
public static final String DOCKER_IMAGE_NAME = "whhe/oblogproxy:obce_3.1.1";
// For details about config, see https://github.com/whhe/dockerfiles/tree/master/oblogproxy
public static final int OB_LOG_PROXY_PORT = 2983;
public static final int OB_SERVER_SQL_PORT = 2881;
public static final int OB_SERVER_RPC_PORT = 2882;
// Here we use root user of system tenant for testing, as the log proxy service needs
// a user of system tenant for authentication. It is not recommended for production.
public static final String OB_SYS_USERNAME = "root";
public static final String OB_SYS_PASSWORD = "pswd";
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
public static final GenericContainer<?> OB_WITH_LOG_PROXY =
new GenericContainer<>(DOCKER_IMAGE_NAME)
.withNetwork(NETWORK)
.withNetworkAliases(OB_LOG_PROXY_SERVICE_NAME)
.withExposedPorts(OB_SERVER_SQL_PORT, OB_SERVER_RPC_PORT, OB_LOG_PROXY_PORT)
.withEnv("OB_ROOT_PASSWORD", OB_SYS_PASSWORD)
.waitingFor(
new WaitAllStrategy()
.withStrategy(Wait.forListeningPort())
.withStrategy(Wait.forLogMessage(".*boot success!.*", 1)))
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass
public static void startContainers() {
// Add jvm dns cache for flink to invoke ob interface.
DnsCacheManipulator.setDnsCache(OB_LOG_PROXY_SERVICE_NAME, "127.0.0.1");
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(OB_WITH_LOG_PROXY)).join();
LOG.info("Containers are started.");
}
@AfterClass
public static void stopContainers() {
DnsCacheManipulator.removeDnsCache(OB_LOG_PROXY_SERVICE_NAME);
LOG.info("Stopping containers...");
Stream.of(OB_WITH_LOG_PROXY).forEach(GenericContainer::stop);
LOG.info("Containers are stopped.");
}
public static String getJdbcUrl(String databaseName) {
return "jdbc:mysql://"
+ OB_WITH_LOG_PROXY.getContainerIpAddress()
+ ":"
+ OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT)
+ "/"
+ databaseName
+ "?useSSL=false";
}
protected static Connection getJdbcConnection(String databaseName) throws SQLException {
return DriverManager.getConnection(
getJdbcUrl(databaseName), OB_SYS_USERNAME, OB_SYS_PASSWORD);
}
private static void dropTestDatabase(Connection connection, String databaseName) {
try {
Awaitility.await(String.format("Dropping database %s", databaseName))
.atMost(120, TimeUnit.SECONDS)
.until(
() -> {
try {
String sql =
String.format(
"DROP DATABASE IF EXISTS %s", databaseName);
connection.createStatement().execute(sql);
return true;
} catch (SQLException e) {
LOG.warn(
String.format(
"DROP DATABASE %s failed: {}", databaseName),
e.getMessage());
return false;
}
});
} catch (ConditionTimeoutException e) {
throw new IllegalStateException("Failed to drop test database", e);
}
}
protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection("");
Statement statement = connection.createStatement()) {
dropTestDatabase(connection, sqlFile);
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,455 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import com.ververica.cdc.connectors.oceanbase.OceanBaseTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertTrue;
/** Integration tests for OceanBase change stream event SQL source. */
public class OceanBaseConnectorITCase extends OceanBaseTestBase {
private static final int DEFAULT_PARALLELISM = 2;
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment()
.setParallelism(DEFAULT_PARALLELISM);
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
}
@Test
public void testConsumingAllEvents() throws Exception {
initializeTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'oceanbase-cdc',"
+ " 'scan.startup.mode' = 'initial',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'tenant-name' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'rootserver-list' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'"
+ ")",
OB_SYS_USERNAME,
OB_SYS_PASSWORD,
"sys",
"inventory",
"products",
OB_WITH_LOG_PROXY.getContainerIpAddress(),
OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT),
"127.0.0.1:2882:2881",
OB_WITH_LOG_PROXY.getContainerIpAddress(),
OB_WITH_LOG_PROXY.getMappedPort(OB_LOG_PROXY_PORT));
String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '30'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source");
waitForSinkSize("sink", 9);
int snapshotSize = sinkSize("sink");
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
waitForSinkSize("sink", snapshotSize + 7);
/*
* <pre>
* The final database table looks like this:
*
* > SELECT * FROM products;
* +-----+--------------------+---------------------------------------------------------+--------+
* | id | name | description | weight |
* +-----+--------------------+---------------------------------------------------------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+
* </pre>
*/
List<String> expected =
Arrays.asList(
"+I(101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(102,car battery,12V car battery,8.1000000000)",
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(107,rocks,box of assorted rocks,5.3000000000)",
"+I(108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(109,spare tire,24 inch spare tire,22.2000000000)",
"+U(106,hammer,18oz carpenter hammer,1.0000000000)",
"+U(107,rocks,box of assorted rocks,5.1000000000)",
"+I(110,jacket,water resistent white wind breaker,0.2000000000)",
"+I(111,scooter,Big 2-wheel scooter ,5.1800000000)",
"+U(110,jacket,new water resistent white wind breaker,0.5000000000)",
"+U(111,scooter,Big 2-wheel scooter ,5.1700000000)",
"-D(111,scooter,Big 2-wheel scooter ,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testMetadataColumns() throws Exception {
initializeTable("inventory_meta");
String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
+ " tenant STRING METADATA FROM 'tenant_name' VIRTUAL,"
+ " database STRING METADATA FROM 'database_name' VIRTUAL,"
+ " `table` STRING METADATA FROM 'table_name' VIRTUAL,"
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'oceanbase-cdc',"
+ " 'scan.startup.mode' = 'initial',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'tenant-name' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'rootserver-list' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'"
+ ")",
OB_SYS_USERNAME,
OB_SYS_PASSWORD,
"sys",
"inventory_meta",
"products",
OB_WITH_LOG_PROXY.getContainerIpAddress(),
OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT),
"127.0.0.1:2882:2881",
OB_WITH_LOG_PROXY.getContainerIpAddress(),
OB_WITH_LOG_PROXY.getMappedPort(OB_LOG_PROXY_PORT));
String sinkDDL =
"CREATE TABLE sink ("
+ " tenant STRING,"
+ " database STRING,"
+ " `table` STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " primary key (tenant, database, `table`, `id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source");
waitForSinkSize("sink", 9);
int snapshotSize = sinkSize("sink");
try (Connection connection = getJdbcConnection("inventory_meta");
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
}
waitForSinkSize("sink", snapshotSize + 1);
List<String> expected =
Arrays.asList(
"+I(sys,inventory_meta,products,101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(sys,inventory_meta,products,102,car battery,12V car battery,8.1000000000)",
"+I(sys,inventory_meta,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(sys,inventory_meta,products,104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(sys,inventory_meta,products,105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(sys,inventory_meta,products,106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(sys,inventory_meta,products,107,rocks,box of assorted rocks,5.3000000000)",
"+I(sys,inventory_meta,products,108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(sys,inventory_meta,products,109,spare tire,24 inch spare tire,22.2000000000)",
"+U(sys,inventory_meta,products,106,hammer,18oz carpenter hammer,1.0000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testAllDataTypes() throws Exception {
ZoneId serverTimeZone = ZoneId.systemDefault();
ZoneOffset zoneOffset = serverTimeZone.getRules().getOffset(Instant.now());
try (Connection connection = getJdbcConnection("");
Statement statement = connection.createStatement()) {
statement.execute(String.format("SET GLOBAL time_zone = '%s';", zoneOffset.getId()));
}
tEnv.getConfig().setLocalTimeZone(serverTimeZone);
initializeTable("column_type_test");
String sourceDDL =
String.format(
"CREATE TABLE ob_source (\n"
+ " `id` INT NOT NULL,\n"
+ " bool_c TINYINT,\n"
+ " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT,\n"
+ " small_c SMALLINT ,\n"
+ " small_un_c INT ,\n"
+ " medium_c INT,\n"
+ " medium_un_c INT,\n"
+ " int_c INT,\n"
+ " int_un_c BIGINT,\n"
+ " big_c BIGINT,\n"
+ " big_un_c DECIMAL(20, 0),\n"
+ " real_c FLOAT,\n"
+ " float_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " decimal_c DECIMAL(8, 4),\n"
+ " numeric_c DECIMAL(6, 0),\n"
+ " big_decimal_c STRING,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP,\n"
+ " char_c CHAR(3),\n"
+ " varchar_c VARCHAR(255),\n"
+ " bit_c BINARY(8),\n"
+ " text_c STRING,\n"
+ " tiny_blob_c BYTES,\n"
+ " medium_blob_c BYTES,\n"
+ " long_blob_c BYTES,\n"
+ " blob_c BYTES,\n"
+ " year_c INT,\n"
+ " set_c STRING,\n"
+ " enum_c STRING,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'oceanbase-cdc',"
+ " 'scan.startup.mode' = 'initial',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'tenant-name' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'rootserver-list' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s',"
+ " 'server-time-zone' = '%s'"
+ ")",
OB_SYS_USERNAME,
OB_SYS_PASSWORD,
"sys",
"column_type_test",
"full_types",
OB_WITH_LOG_PROXY.getContainerIpAddress(),
OB_WITH_LOG_PROXY.getMappedPort(OB_SERVER_SQL_PORT),
"127.0.0.1:2882:2881",
OB_WITH_LOG_PROXY.getContainerIpAddress(),
OB_WITH_LOG_PROXY.getMappedPort(OB_LOG_PROXY_PORT),
serverTimeZone);
String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,\n"
+ " bool_c TINYINT,\n"
+ " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT,\n"
+ " small_c SMALLINT ,\n"
+ " small_un_c INT ,\n"
+ " medium_c INT,\n"
+ " medium_un_c INT,\n"
+ " int_c INT,\n"
+ " int_un_c BIGINT,\n"
+ " big_c BIGINT,\n"
+ " big_un_c DECIMAL(20, 0),\n"
+ " real_c FLOAT,\n"
+ " float_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " decimal_c DECIMAL(8, 4),\n"
+ " numeric_c DECIMAL(6, 0),\n"
+ " big_decimal_c STRING,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP,\n"
+ " char_c CHAR(3),\n"
+ " varchar_c VARCHAR(255),\n"
+ " bit_c BINARY(8),\n"
+ " text_c STRING,\n"
+ " tiny_blob_c BYTES,\n"
+ " medium_blob_c BYTES,\n"
+ " blob_c BYTES,\n"
+ " long_blob_c BYTES,\n"
+ " year_c INT,\n"
+ " enum_c STRING,\n"
+ " set_c STRING,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '3'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source");
waitForSinkSize("sink", 1);
int snapshotSize = sinkSize("sink");
try (Connection connection = getJdbcConnection("column_type_test");
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
waitForSinkSize("sink", snapshotSize + 1);
List<String> expected =
Arrays.asList(
"+I(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:00:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)",
"+U(1,1,127,255,32767,65535,8388607,16777215,2147483647,4294967295,9223372036854775807,18446744073709551615,123.102,123.102,404.4443,123.4567,346,34567892.1,2020-07-17,18:00:22,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17T18:33:22,abc,Hello World,[4, 4, 4, 4, 4, 4, 4, 4],text,[16],[16],[16],[16],2022,a,red)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
public static void assertContainsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertTrue(actual.containsAll(expected));
}
}

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Test for {@link OceanBaseTableSource} created by {@link OceanBaseTableSourceFactory}. */
public class OceanBaseTableFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("aaa")));
private static final ResolvedSchema SCHEMA_WITH_METADATA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata("tenant", DataTypes.STRING(), "tenant_name", true),
Column.metadata("database", DataTypes.STRING(), "database_name", true),
Column.metadata("table", DataTypes.STRING(), "table_name", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("aaa")));
private static final String STARTUP_MODE = "latest-offset";
private static final String USERNAME = "user@sys";
private static final String PASSWORD = "pswd";
private static final String TENANT_NAME = "sys";
private static final String DATABASE_NAME = "db";
private static final String TABLE_NAME = "table";
private static final String RS_LIST = "127.0.0.1:2882:2881";
private static final String LOG_PROXY_HOST = "127.0.0.1";
private static final String LOG_PROXY_PORT = "2983";
@Test
public void testCommonProperties() {
Map<String, String> properties = getRequiredOptions();
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
OceanBaseTableSource expectedSource =
new OceanBaseTableSource(
SCHEMA,
StartupMode.LATEST_OFFSET,
null,
USERNAME,
PASSWORD,
TENANT_NAME,
DATABASE_NAME,
TABLE_NAME,
null,
null,
Duration.ofSeconds(30),
ZoneId.of("UTC"),
RS_LIST,
LOG_PROXY_HOST,
2983);
assertEquals(expectedSource, actualSource);
}
@Test
public void testOptionalProperties() {
Map<String, String> options = getRequiredOptions();
options.put("scan.startup.mode", "timestamp");
options.put("scan.startup.timestamp", "0");
options.put("hostname", "127.0.0.1");
options.put("port", "2881");
DynamicTableSource actualSource = createTableSource(SCHEMA, options);
OceanBaseTableSource expectedSource =
new OceanBaseTableSource(
SCHEMA,
StartupMode.TIMESTAMP,
0L,
USERNAME,
PASSWORD,
TENANT_NAME,
DATABASE_NAME,
TABLE_NAME,
"127.0.0.1",
2881,
Duration.ofSeconds(30),
ZoneId.of("UTC"),
RS_LIST,
LOG_PROXY_HOST,
2983);
assertEquals(expectedSource, actualSource);
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getRequiredOptions();
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
OceanBaseTableSource oceanBaseTableSource = (OceanBaseTableSource) actualSource;
oceanBaseTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "tenant_name", "database_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = oceanBaseTableSource.copy();
OceanBaseTableSource expectedSource =
new OceanBaseTableSource(
SCHEMA_WITH_METADATA,
StartupMode.LATEST_OFFSET,
null,
USERNAME,
PASSWORD,
TENANT_NAME,
DATABASE_NAME,
TABLE_NAME,
null,
null,
Duration.ofSeconds(30),
ZoneId.of("UTC"),
RS_LIST,
LOG_PROXY_HOST,
2983);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "tenant_name", "database_name", "table_name");
assertEquals(expectedSource, actualSource);
}
@Test
public void testValidation() {
try {
Map<String, String> properties = getRequiredOptions();
properties.put("unknown", "abc");
createTableSource(SCHEMA, properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown")
.isPresent());
}
}
private Map<String, String> getRequiredOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "oceanbase-cdc");
options.put("scan.startup.mode", STARTUP_MODE);
options.put("username", USERNAME);
options.put("password", PASSWORD);
options.put("tenant-name", TENANT_NAME);
options.put("database-name", DATABASE_NAME);
options.put("table-name", TABLE_NAME);
options.put("rootserver-list", RS_LIST);
options.put("logproxy.host", LOG_PROXY_HOST);
options.put("logproxy.port", LOG_PROXY_PORT);
return options;
}
private static DynamicTableSource createTableSource(
ResolvedSchema schema, Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),
schema),
new Configuration(),
OceanBaseTableFactoryTest.class.getClassLoader(),
false);
}
}

@ -0,0 +1,67 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: column_type_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE column_type_test;
USE column_type_test;
CREATE TABLE full_types
(
id INT AUTO_INCREMENT NOT NULL,
bool_c BOOLEAN,
tiny_c TINYINT,
tiny_un_c TINYINT UNSIGNED,
small_c SMALLINT,
small_un_c SMALLINT UNSIGNED,
medium_c MEDIUMINT,
medium_un_c MEDIUMINT UNSIGNED,
int_c INTEGER,
int_un_c INTEGER UNSIGNED,
big_c BIGINT,
big_un_c BIGINT UNSIGNED,
real_c REAL,
float_c FLOAT,
double_c DOUBLE,
decimal_c DECIMAL(8, 4),
numeric_c NUMERIC(6, 0),
big_decimal_c DECIMAL(65, 1),
date_c DATE,
time_c TIME(0),
datetime3_c TIMESTAMP(3),
datetime6_c TIMESTAMP(6),
timestamp_c TIMESTAMP,
char_c CHAR(3),
varchar_c VARCHAR(255),
bit_c BIT(64),
text_c TEXT,
tiny_blob_c TINYBLOB,
medium_blob_c MEDIUMBLOB,
blob_c BLOB,
long_blob_c LONGBLOB,
year_c YEAR,
set_c SET ('a', 'b'),
enum_c ENUM ('red', 'green', 'blue'),
PRIMARY KEY (id)
) DEFAULT CHARSET = utf8mb4;
INSERT INTO full_types
VALUES (DEFAULT, true, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 9223372036854775807,
18446744073709551615, 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, '2020-07-17', '18:00:22',
'2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', 'abc', 'Hello World',
b'0000010000000100000001000000010000000100000001000000010000000100', 'text', UNHEX(HEX(16)), UNHEX(HEX(16)),
UNHEX(HEX(16)), UNHEX(HEX(16)), 2022, 'a', 'red');

@ -0,0 +1,42 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE inventory;
USE inventory;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight DECIMAL(20, 10)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
(default, "car battery", "12V car battery", 8.1),
(default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
(default, "hammer", "12oz carpenter's hammer", 0.75),
(default, "hammer", "14oz carpenter's hammer", 0.875),
(default, "hammer", "16oz carpenter's hammer", 1.0),
(default, "rocks", "box of assorted rocks", 5.3),
(default, "jacket", "water resistent black wind breaker", 0.1),
(default, "spare tire", "24 inch spare tire", 22.2);

@ -0,0 +1,42 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory_meta
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE inventory_meta;
USE inventory_meta;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight DECIMAL(20, 10)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
(default, "car battery", "12V car battery", 8.1),
(default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
(default, "hammer", "12oz carpenter's hammer", 0.75),
(default, "hammer", "14oz carpenter's hammer", 0.875),
(default, "hammer", "16oz carpenter's hammer", 1.0),
(default, "rocks", "box of assorted rocks", 5.3),
(default, "jacket", "water resistent black wind breaker", 0.1),
(default, "spare tire", "24 inch spare tire", 22.2);

@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-sql-connector-oceanbase-cdc</artifactId>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
<include>com.ververica:flink-connector-debezium</include>
<include>com.ververica:flink-connector-oceanbase-cdc</include>
<include>mysql:mysql-connector-java</include>
<include>com.oceanbase.logclient:*</include>
<include>io.netty:netty-all</include>
<include>com.google.protobuf:protobuf-java</include>
<include>commons-codec:commons-codec</include>
<include>org.lz4:lz4-java</include>
<include>org.apache.avro:avro</include>
<include>org.apache.commons:*</include>
<include>org.apache.kafka:*</include>
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>
<!-- Include fixed version 18.0-13.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
<exclude>common/**</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.apache.kafka
</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.apache.avro
</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.fasterxml
</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.google
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oceanbase;
/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */
public class DummyDocs {}

@ -0,0 +1,6 @@
flink-sql-connector-oceanbase-cdc
Copyright 2020 Ververica Inc.
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.kafka:kafka-clients:2.7.0

@ -39,12 +39,14 @@ under the License.
<module>flink-connector-postgres-cdc</module>
<module>flink-connector-oracle-cdc</module>
<module>flink-connector-mongodb-cdc</module>
<module>flink-connector-oceanbase-cdc</module>
<module>flink-connector-sqlserver-cdc</module>
<module>flink-connector-tidb-cdc</module>
<module>flink-sql-connector-mysql-cdc</module>
<module>flink-sql-connector-postgres-cdc</module>
<module>flink-sql-connector-mongodb-cdc</module>
<module>flink-sql-connector-oracle-cdc</module>
<module>flink-sql-connector-oceanbase-cdc</module>
<module>flink-sql-connector-sqlserver-cdc</module>
<module>flink-sql-connector-tidb-cdc</module>
<module>flink-cdc-e2e-tests</module>
@ -88,6 +90,7 @@ under the License.
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version>
<oblogclient.version>1.0.2</oblogclient.version>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<flink.forkCount>1</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>

Loading…
Cancel
Save