[FLINK-34639][cdc][oceanbase] Support debezium deserializer in OceanBase source connector (#3124)
parent
bdca0e328b
commit
05281e5d6f
@ -1,632 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source;
|
||||
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseAppendMetadataCollector;
|
||||
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
|
||||
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseMetadataConverter;
|
||||
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseRecord;
|
||||
import org.apache.flink.cdc.debezium.utils.TemporalConversions;
|
||||
import org.apache.flink.table.data.DecimalData;
|
||||
import org.apache.flink.table.data.GenericArrayData;
|
||||
import org.apache.flink.table.data.GenericRowData;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.data.StringData;
|
||||
import org.apache.flink.table.data.TimestampData;
|
||||
import org.apache.flink.table.types.logical.DecimalType;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.types.RowKind;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import com.oceanbase.oms.logmessage.ByteString;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Date;
|
||||
import java.sql.Time;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.flink.util.Preconditions.checkNotNull;
|
||||
|
||||
/**
|
||||
* Deserialization schema from OceanBase object to Flink Table/SQL internal data structure {@link
|
||||
* RowData}.
|
||||
*/
|
||||
public class RowDataOceanBaseDeserializationSchema
|
||||
implements OceanBaseDeserializationSchema<RowData> {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/** TypeInformation of the produced {@link RowData}. * */
|
||||
private final TypeInformation<RowData> resultTypeInfo;
|
||||
|
||||
/**
|
||||
* Runtime converter that OceanBase record data into {@link RowData} consisted of physical
|
||||
* column values.
|
||||
*/
|
||||
private final OceanBaseDeserializationRuntimeConverter physicalConverter;
|
||||
|
||||
/** Whether the deserializer needs to handle metadata columns. */
|
||||
private final boolean hasMetadata;
|
||||
|
||||
/**
|
||||
* A wrapped output collector which is used to append metadata columns after physical columns.
|
||||
*/
|
||||
private final OceanBaseAppendMetadataCollector appendMetadataCollector;
|
||||
|
||||
/** Returns a builder to build {@link RowDataOceanBaseDeserializationSchema}. */
|
||||
public static RowDataOceanBaseDeserializationSchema.Builder newBuilder() {
|
||||
return new RowDataOceanBaseDeserializationSchema.Builder();
|
||||
}
|
||||
|
||||
RowDataOceanBaseDeserializationSchema(
|
||||
RowType physicalDataType,
|
||||
OceanBaseMetadataConverter[] metadataConverters,
|
||||
TypeInformation<RowData> resultTypeInfo,
|
||||
ZoneId serverTimeZone) {
|
||||
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
|
||||
this.appendMetadataCollector = new OceanBaseAppendMetadataCollector(metadataConverters);
|
||||
this.physicalConverter = createConverter(checkNotNull(physicalDataType), serverTimeZone);
|
||||
this.resultTypeInfo = checkNotNull(resultTypeInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deserialize(OceanBaseRecord record, Collector<RowData> out) throws Exception {
|
||||
RowData physicalRow;
|
||||
if (record.isSnapshotRecord()) {
|
||||
physicalRow = (GenericRowData) physicalConverter.convert(record.getJdbcFields());
|
||||
physicalRow.setRowKind(RowKind.INSERT);
|
||||
emit(record, physicalRow, out);
|
||||
} else {
|
||||
switch (record.getOpt()) {
|
||||
case INSERT:
|
||||
physicalRow =
|
||||
(GenericRowData)
|
||||
physicalConverter.convert(record.getLogMessageFieldsAfter());
|
||||
physicalRow.setRowKind(RowKind.INSERT);
|
||||
emit(record, physicalRow, out);
|
||||
break;
|
||||
case DELETE:
|
||||
physicalRow =
|
||||
(GenericRowData)
|
||||
physicalConverter.convert(record.getLogMessageFieldsBefore());
|
||||
physicalRow.setRowKind(RowKind.DELETE);
|
||||
emit(record, physicalRow, out);
|
||||
break;
|
||||
case UPDATE:
|
||||
physicalRow =
|
||||
(GenericRowData)
|
||||
physicalConverter.convert(record.getLogMessageFieldsBefore());
|
||||
physicalRow.setRowKind(RowKind.UPDATE_BEFORE);
|
||||
emit(record, physicalRow, out);
|
||||
physicalRow =
|
||||
(GenericRowData)
|
||||
physicalConverter.convert(record.getLogMessageFieldsAfter());
|
||||
physicalRow.setRowKind(RowKind.UPDATE_AFTER);
|
||||
emit(record, physicalRow, out);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported log message record type: " + record.getOpt());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void emit(OceanBaseRecord row, RowData physicalRow, Collector<RowData> collector) {
|
||||
if (!hasMetadata) {
|
||||
collector.collect(physicalRow);
|
||||
return;
|
||||
}
|
||||
|
||||
appendMetadataCollector.inputRecord = row;
|
||||
appendMetadataCollector.outputCollector = collector;
|
||||
appendMetadataCollector.collect(physicalRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeInformation<RowData> getProducedType() {
|
||||
return resultTypeInfo;
|
||||
}
|
||||
|
||||
/** Builder class of {@link RowDataOceanBaseDeserializationSchema}. */
|
||||
public static class Builder {
|
||||
private RowType physicalRowType;
|
||||
private TypeInformation<RowData> resultTypeInfo;
|
||||
private OceanBaseMetadataConverter[] metadataConverters = new OceanBaseMetadataConverter[0];
|
||||
private ZoneId serverTimeZone = ZoneId.of("UTC");
|
||||
|
||||
public RowDataOceanBaseDeserializationSchema.Builder setPhysicalRowType(
|
||||
RowType physicalRowType) {
|
||||
this.physicalRowType = physicalRowType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RowDataOceanBaseDeserializationSchema.Builder setMetadataConverters(
|
||||
OceanBaseMetadataConverter[] metadataConverters) {
|
||||
this.metadataConverters = metadataConverters;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RowDataOceanBaseDeserializationSchema.Builder setResultTypeInfo(
|
||||
TypeInformation<RowData> resultTypeInfo) {
|
||||
this.resultTypeInfo = resultTypeInfo;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RowDataOceanBaseDeserializationSchema.Builder setServerTimeZone(
|
||||
ZoneId serverTimeZone) {
|
||||
this.serverTimeZone = serverTimeZone;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RowDataOceanBaseDeserializationSchema build() {
|
||||
return new RowDataOceanBaseDeserializationSchema(
|
||||
physicalRowType, metadataConverters, resultTypeInfo, serverTimeZone);
|
||||
}
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter createConverter(
|
||||
LogicalType type, ZoneId serverTimeZone) {
|
||||
return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone));
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter wrapIntoNullableConverter(
|
||||
OceanBaseDeserializationRuntimeConverter converter) {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) throws Exception {
|
||||
if (object == null) {
|
||||
return null;
|
||||
}
|
||||
return converter.convert(object);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static OceanBaseDeserializationRuntimeConverter createNotNullConverter(
|
||||
LogicalType type, ZoneId serverTimeZone) {
|
||||
switch (type.getTypeRoot()) {
|
||||
case ROW:
|
||||
return createRowConverter((RowType) type, serverTimeZone);
|
||||
case NULL:
|
||||
return convertToNull();
|
||||
case BOOLEAN:
|
||||
return convertToBoolean();
|
||||
case TINYINT:
|
||||
return convertToTinyInt();
|
||||
case SMALLINT:
|
||||
return convertToSmallInt();
|
||||
case INTEGER:
|
||||
case INTERVAL_YEAR_MONTH:
|
||||
return convertToInt();
|
||||
case BIGINT:
|
||||
case INTERVAL_DAY_TIME:
|
||||
return convertToLong();
|
||||
case DATE:
|
||||
return convertToDate();
|
||||
case TIME_WITHOUT_TIME_ZONE:
|
||||
return convertToTime();
|
||||
case TIMESTAMP_WITHOUT_TIME_ZONE:
|
||||
return convertToTimestamp();
|
||||
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
|
||||
return convertToLocalTimeZoneTimestamp(serverTimeZone);
|
||||
case FLOAT:
|
||||
return convertToFloat();
|
||||
case DOUBLE:
|
||||
return convertToDouble();
|
||||
case CHAR:
|
||||
case VARCHAR:
|
||||
return convertToString();
|
||||
case BINARY:
|
||||
return convertToBinary();
|
||||
case VARBINARY:
|
||||
return convertToBytes();
|
||||
case DECIMAL:
|
||||
return createDecimalConverter((DecimalType) type);
|
||||
case ARRAY:
|
||||
return createArrayConverter();
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unsupported type: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter createRowConverter(
|
||||
RowType rowType, ZoneId serverTimeZone) {
|
||||
final OceanBaseDeserializationRuntimeConverter[] fieldConverters =
|
||||
rowType.getFields().stream()
|
||||
.map(RowType.RowField::getType)
|
||||
.map(logicType -> createConverter(logicType, serverTimeZone))
|
||||
.toArray(OceanBaseDeserializationRuntimeConverter[]::new);
|
||||
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
int arity = fieldNames.length;
|
||||
GenericRowData row = new GenericRowData(arity);
|
||||
Map<String, Object> fieldMap = (Map<String, Object>) object;
|
||||
for (int i = 0; i < arity; i++) {
|
||||
String fieldName = fieldNames[i];
|
||||
Object value = fieldMap.get(fieldName);
|
||||
try {
|
||||
row.setField(i, fieldConverters[i].convert(value));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to convert field '" + fieldName + "' with value: " + value,
|
||||
e);
|
||||
}
|
||||
}
|
||||
return row;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToNull() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToBoolean() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof byte[]) {
|
||||
return "1".equals(new String((byte[]) object, StandardCharsets.UTF_8));
|
||||
}
|
||||
return Boolean.parseBoolean(object.toString()) || "1".equals(object.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToTinyInt() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
return Byte.parseByte(object.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToSmallInt() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
return Short.parseShort(object.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToInt() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof Integer) {
|
||||
return object;
|
||||
} else if (object instanceof Long) {
|
||||
return ((Long) object).intValue();
|
||||
} else if (object instanceof Date) {
|
||||
return ((Date) object).toLocalDate().getYear();
|
||||
} else {
|
||||
return Integer.parseInt(object.toString());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToLong() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof Integer) {
|
||||
return ((Integer) object).longValue();
|
||||
} else if (object instanceof Long) {
|
||||
return object;
|
||||
} else {
|
||||
return Long.parseLong(object.toString());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToDouble() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof Float) {
|
||||
return ((Float) object).doubleValue();
|
||||
} else if (object instanceof Double) {
|
||||
return object;
|
||||
} else {
|
||||
return Double.parseDouble(object.toString());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToFloat() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof Float) {
|
||||
return object;
|
||||
} else if (object instanceof Double) {
|
||||
return ((Double) object).floatValue();
|
||||
} else {
|
||||
return Float.parseFloat(object.toString());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToDate() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof String) {
|
||||
object = Date.valueOf((String) object);
|
||||
}
|
||||
return (int) TemporalConversions.toLocalDate(object).toEpochDay();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToTime() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof Long) {
|
||||
return (int) ((Long) object / 1000_000);
|
||||
}
|
||||
if (object instanceof String) {
|
||||
object = Time.valueOf((String) object);
|
||||
}
|
||||
return TemporalConversions.toLocalTime(object).toSecondOfDay() * 1000;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToTimestamp() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof String) {
|
||||
object = Timestamp.valueOf((String) object);
|
||||
}
|
||||
if (object instanceof Timestamp) {
|
||||
return TimestampData.fromTimestamp((Timestamp) object);
|
||||
}
|
||||
if (object instanceof LocalDateTime) {
|
||||
return TimestampData.fromLocalDateTime((LocalDateTime) object);
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to convert to TimestampData from unexpected value '"
|
||||
+ object
|
||||
+ "' of type "
|
||||
+ object.getClass().getName());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
|
||||
ZoneId serverTimeZone) {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof String) {
|
||||
object = Timestamp.valueOf((String) object);
|
||||
}
|
||||
if (object instanceof Timestamp) {
|
||||
return TimestampData.fromInstant(
|
||||
((Timestamp) object)
|
||||
.toLocalDateTime()
|
||||
.atZone(serverTimeZone)
|
||||
.toInstant());
|
||||
}
|
||||
if (object instanceof LocalDateTime) {
|
||||
return TimestampData.fromInstant(
|
||||
((LocalDateTime) object).atZone(serverTimeZone).toInstant());
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to convert to TimestampData from unexpected value '"
|
||||
+ object
|
||||
+ "' of type "
|
||||
+ object.getClass().getName());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToString() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
return StringData.fromString(object.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToBinary() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof String) {
|
||||
try {
|
||||
long v = Long.parseLong((String) object);
|
||||
byte[] bytes = ByteBuffer.allocate(8).putLong(v).array();
|
||||
int i = 0;
|
||||
while (i < Long.BYTES - 1 && bytes[i] == 0) {
|
||||
i++;
|
||||
}
|
||||
return Arrays.copyOfRange(bytes, i, Long.BYTES);
|
||||
} catch (NumberFormatException e) {
|
||||
return ((String) object).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
} else if (object instanceof byte[]) {
|
||||
String str = new String((byte[]) object, StandardCharsets.US_ASCII);
|
||||
return str.getBytes(StandardCharsets.UTF_8);
|
||||
} else if (object instanceof ByteBuffer) {
|
||||
ByteBuffer byteBuffer = (ByteBuffer) object;
|
||||
byte[] bytes = new byte[byteBuffer.remaining()];
|
||||
byteBuffer.get(bytes);
|
||||
return bytes;
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unsupported BINARY value type: " + object.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter convertToBytes() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
if (object instanceof String) {
|
||||
return ((String) object).getBytes(StandardCharsets.UTF_8);
|
||||
} else if (object instanceof byte[]) {
|
||||
return object;
|
||||
} else if (object instanceof ByteBuffer) {
|
||||
ByteBuffer byteBuffer = (ByteBuffer) object;
|
||||
byte[] bytes = new byte[byteBuffer.remaining()];
|
||||
byteBuffer.get(bytes);
|
||||
return bytes;
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unsupported BYTES value type: " + object.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter createDecimalConverter(
|
||||
DecimalType decimalType) {
|
||||
final int precision = decimalType.getPrecision();
|
||||
final int scale = decimalType.getScale();
|
||||
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
BigDecimal bigDecimal;
|
||||
if (object instanceof String) {
|
||||
bigDecimal = new BigDecimal((String) object);
|
||||
} else if (object instanceof Long) {
|
||||
bigDecimal = new BigDecimal((Long) object);
|
||||
} else if (object instanceof BigInteger) {
|
||||
bigDecimal = new BigDecimal((BigInteger) object);
|
||||
} else if (object instanceof Double) {
|
||||
bigDecimal = BigDecimal.valueOf((Double) object);
|
||||
} else if (object instanceof BigDecimal) {
|
||||
bigDecimal = (BigDecimal) object;
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to convert to decimal from unexpected value '"
|
||||
+ object
|
||||
+ "' of type "
|
||||
+ object.getClass());
|
||||
}
|
||||
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static OceanBaseDeserializationRuntimeConverter createArrayConverter() {
|
||||
return new OceanBaseDeserializationRuntimeConverter() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
String s;
|
||||
if (object instanceof ByteString) {
|
||||
s = ((ByteString) object).toString(StandardCharsets.UTF_8.name());
|
||||
} else {
|
||||
s = object.toString();
|
||||
}
|
||||
String[] strArray = s.split(",");
|
||||
StringData[] stringDataArray = new StringData[strArray.length];
|
||||
for (int i = 0; i < strArray.length; i++) {
|
||||
stringDataArray[i] = StringData.fromString(strArray[i]);
|
||||
}
|
||||
return new GenericArrayData(stringDataArray);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.config;
|
||||
|
||||
import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBaseSourceInfoStructMaker;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.SourceInfoStructMaker;
|
||||
import io.debezium.relational.ColumnFilterMode;
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/** Debezium connector config. */
|
||||
public class OceanBaseConnectorConfig extends RelationalDatabaseConnectorConfig {
|
||||
|
||||
protected static final String LOGICAL_NAME = "oceanbase_cdc_connector";
|
||||
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = Integer.MIN_VALUE;
|
||||
protected static final List<String> BUILT_IN_DB_NAMES =
|
||||
Collections.unmodifiableList(
|
||||
Arrays.asList(
|
||||
"information_schema", "mysql", "oceanbase", "LBACSYS", "ORAAUDITOR"));
|
||||
|
||||
private final String compatibleMode;
|
||||
private final String serverTimeZone;
|
||||
|
||||
public OceanBaseConnectorConfig(
|
||||
String compatibleMode, String serverTimeZone, Properties properties) {
|
||||
super(
|
||||
Configuration.from(properties),
|
||||
LOGICAL_NAME,
|
||||
Tables.TableFilter.fromPredicate(
|
||||
tableId ->
|
||||
"mysql".equalsIgnoreCase(compatibleMode)
|
||||
? !BUILT_IN_DB_NAMES.contains(tableId.catalog())
|
||||
: !BUILT_IN_DB_NAMES.contains(tableId.schema())),
|
||||
TableId::identifier,
|
||||
DEFAULT_SNAPSHOT_FETCH_SIZE,
|
||||
"mysql".equalsIgnoreCase(compatibleMode)
|
||||
? ColumnFilterMode.CATALOG
|
||||
: ColumnFilterMode.SCHEMA);
|
||||
this.compatibleMode = compatibleMode;
|
||||
this.serverTimeZone = serverTimeZone;
|
||||
}
|
||||
|
||||
public String getCompatibleMode() {
|
||||
return compatibleMode;
|
||||
}
|
||||
|
||||
public String getServerTimeZone() {
|
||||
return serverTimeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectorName() {
|
||||
return "oceanbase";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContextName() {
|
||||
return "OceanBase";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
|
||||
return new OceanBaseSourceInfoStructMaker();
|
||||
}
|
||||
}
|
168
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseConnection.java → flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
168
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseConnection.java → flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.converter;
|
||||
|
||||
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter;
|
||||
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverterFactory;
|
||||
import org.apache.flink.table.data.GenericArrayData;
|
||||
import org.apache.flink.table.data.StringData;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.util.Optional;
|
||||
|
||||
/** Used to create {@link DeserializationRuntimeConverterFactory} specified to OceanBase. */
|
||||
public class OceanBaseDeserializationConverterFactory {
|
||||
|
||||
public static DeserializationRuntimeConverterFactory instance() {
|
||||
return new DeserializationRuntimeConverterFactory() {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
|
||||
LogicalType logicalType, ZoneId serverTimeZone) {
|
||||
switch (logicalType.getTypeRoot()) {
|
||||
case ARRAY:
|
||||
return createArrayConverter();
|
||||
default:
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static Optional<DeserializationRuntimeConverter> createArrayConverter() {
|
||||
return Optional.of(
|
||||
new DeserializationRuntimeConverter() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public Object convert(Object dbzObj, Schema schema) throws Exception {
|
||||
if (dbzObj instanceof String) {
|
||||
String[] enums = ((String) dbzObj).split(",");
|
||||
StringData[] elements = new StringData[enums.length];
|
||||
for (int i = 0; i < enums.length; i++) {
|
||||
elements[i] = StringData.fromString(enums[i]);
|
||||
}
|
||||
return new GenericArrayData(elements);
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Unable convert to Flink ARRAY type from unexpected value '%s'",
|
||||
dbzObj));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,509 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.converter;
|
||||
|
||||
import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.data.Bits;
|
||||
import io.debezium.data.SpecialValueDecimal;
|
||||
import io.debezium.jdbc.JdbcValueConverters;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.time.MicroTimestamp;
|
||||
import io.debezium.time.NanoTimestamp;
|
||||
import org.apache.kafka.connect.data.Decimal;
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Blob;
|
||||
import java.sql.Clob;
|
||||
import java.sql.Date;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Time;
|
||||
import java.sql.Timestamp;
|
||||
import java.sql.Types;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeFormatterBuilder;
|
||||
import java.time.temporal.ChronoField;
|
||||
import java.util.Locale;
|
||||
|
||||
/** JdbcValueConverters for OceanBase. */
|
||||
public class OceanBaseValueConverters extends JdbcValueConverters {
|
||||
|
||||
public static final String EMPTY_BLOB_FUNCTION = "EMPTY_BLOB()";
|
||||
public static final String EMPTY_CLOB_FUNCTION = "EMPTY_CLOB()";
|
||||
|
||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
|
||||
new DateTimeFormatterBuilder()
|
||||
.parseCaseInsensitive()
|
||||
.appendPattern("yyyy-MM-dd HH:mm:ss")
|
||||
.optionalStart()
|
||||
.appendPattern(".")
|
||||
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
|
||||
.optionalEnd()
|
||||
.toFormatter();
|
||||
|
||||
private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER =
|
||||
new DateTimeFormatterBuilder()
|
||||
.parseCaseInsensitive()
|
||||
.appendPattern("dd-MMM-yy hh.mm.ss")
|
||||
.optionalStart()
|
||||
.appendPattern(".")
|
||||
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
|
||||
.optionalEnd()
|
||||
.appendPattern(" a")
|
||||
.toFormatter(Locale.ENGLISH);
|
||||
|
||||
private final String compatibleMode;
|
||||
private final String serverTimezone;
|
||||
|
||||
public OceanBaseValueConverters(OceanBaseConnectorConfig connectorConfig) {
|
||||
super(
|
||||
connectorConfig.getDecimalMode(),
|
||||
connectorConfig.getTemporalPrecisionMode(),
|
||||
ZoneOffset.UTC,
|
||||
x -> x,
|
||||
BigIntUnsignedMode.PRECISE,
|
||||
connectorConfig.binaryHandlingMode());
|
||||
this.compatibleMode = connectorConfig.getCompatibleMode();
|
||||
this.serverTimezone = connectorConfig.getServerTimeZone();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getTimePrecision(Column column) {
|
||||
if ("mysql".equalsIgnoreCase(compatibleMode)) {
|
||||
return super.getTimePrecision(column);
|
||||
}
|
||||
return column.scale().orElse(0);
|
||||
}
|
||||
|
||||
protected boolean isUnsignedColumn(Column column) {
|
||||
return column.typeName().toUpperCase().contains("UNSIGNED");
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaBuilder schemaBuilder(Column column) {
|
||||
logger.debug(
|
||||
"Building schema for column {} of type {} named {} with constraints ({},{})",
|
||||
column.name(),
|
||||
column.jdbcType(),
|
||||
column.typeName(),
|
||||
column.length(),
|
||||
column.scale());
|
||||
|
||||
switch (column.jdbcType()) {
|
||||
case Types.BIT:
|
||||
if (column.length() > 1) {
|
||||
return Bits.builder(column.length());
|
||||
}
|
||||
return SchemaBuilder.bool();
|
||||
case Types.TINYINT:
|
||||
if (column.length() == 1) {
|
||||
return SchemaBuilder.bool();
|
||||
}
|
||||
if (isUnsignedColumn(column)) {
|
||||
return SchemaBuilder.int16();
|
||||
}
|
||||
return SchemaBuilder.int8();
|
||||
case Types.SMALLINT:
|
||||
if (isUnsignedColumn(column)) {
|
||||
return SchemaBuilder.int32();
|
||||
}
|
||||
return SchemaBuilder.int16();
|
||||
case Types.INTEGER:
|
||||
if (!column.typeName().toUpperCase().startsWith("MEDIUMINT")
|
||||
&& isUnsignedColumn(column)) {
|
||||
return SchemaBuilder.int64();
|
||||
}
|
||||
return SchemaBuilder.int32();
|
||||
case Types.BIGINT:
|
||||
if (isUnsignedColumn(column)) {
|
||||
return Decimal.builder(0);
|
||||
}
|
||||
return SchemaBuilder.int64();
|
||||
case Types.FLOAT:
|
||||
return getDecimalSchema(column);
|
||||
case Types.NUMERIC:
|
||||
case Types.DECIMAL:
|
||||
if ("mysql".equalsIgnoreCase(compatibleMode)) {
|
||||
return getDecimalSchema(column);
|
||||
}
|
||||
return getNumericSchema(column);
|
||||
case Types.REAL:
|
||||
return SchemaBuilder.float32();
|
||||
case Types.DOUBLE:
|
||||
return SchemaBuilder.float64();
|
||||
case Types.DATE:
|
||||
if ("mysql".equalsIgnoreCase(compatibleMode)) {
|
||||
if (column.typeName().equalsIgnoreCase("YEAR")) {
|
||||
return io.debezium.time.Year.builder();
|
||||
}
|
||||
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
|
||||
return io.debezium.time.Date.builder();
|
||||
}
|
||||
return org.apache.kafka.connect.data.Date.builder();
|
||||
}
|
||||
return getTimestampSchema(column);
|
||||
case Types.TIME:
|
||||
if (adaptiveTimeMicrosecondsPrecisionMode) {
|
||||
return io.debezium.time.MicroTime.builder();
|
||||
}
|
||||
if (adaptiveTimePrecisionMode) {
|
||||
if (getTimePrecision(column) <= 3) {
|
||||
return io.debezium.time.Time.builder();
|
||||
}
|
||||
if (getTimePrecision(column) <= 6) {
|
||||
return io.debezium.time.MicroTime.builder();
|
||||
}
|
||||
return io.debezium.time.NanoTime.builder();
|
||||
}
|
||||
return org.apache.kafka.connect.data.Time.builder();
|
||||
case Types.TIMESTAMP:
|
||||
return getTimestampSchema(column);
|
||||
case Types.CHAR:
|
||||
case Types.VARCHAR:
|
||||
case Types.LONGVARCHAR:
|
||||
case Types.NCHAR:
|
||||
case Types.NVARCHAR:
|
||||
case Types.CLOB:
|
||||
return SchemaBuilder.string();
|
||||
case Types.BINARY:
|
||||
case Types.VARBINARY:
|
||||
case Types.LONGVARBINARY:
|
||||
case Types.BLOB:
|
||||
return binaryMode.getSchema();
|
||||
default:
|
||||
return super.schemaBuilder(column);
|
||||
}
|
||||
}
|
||||
|
||||
protected SchemaBuilder getNumericSchema(Column column) {
|
||||
if (column.scale().isPresent()) {
|
||||
int scale = column.scale().get();
|
||||
if (scale <= 0) {
|
||||
int width = column.length() - scale;
|
||||
if (width < 3) {
|
||||
return SchemaBuilder.int8();
|
||||
} else if (width < 5) {
|
||||
return SchemaBuilder.int16();
|
||||
} else if (width < 10) {
|
||||
return SchemaBuilder.int32();
|
||||
} else if (width < 19) {
|
||||
return SchemaBuilder.int64();
|
||||
}
|
||||
}
|
||||
}
|
||||
return getDecimalSchema(column);
|
||||
}
|
||||
|
||||
protected SchemaBuilder getDecimalSchema(Column column) {
|
||||
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElse(0));
|
||||
}
|
||||
|
||||
protected SchemaBuilder getTimestampSchema(Column column) {
|
||||
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
|
||||
if (getTimePrecision(column) <= 3) {
|
||||
return io.debezium.time.Timestamp.builder();
|
||||
}
|
||||
if (getTimePrecision(column) <= 6) {
|
||||
return MicroTimestamp.builder();
|
||||
}
|
||||
return NanoTimestamp.builder();
|
||||
}
|
||||
return org.apache.kafka.connect.data.Timestamp.builder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueConverter converter(Column column, Field fieldDefn) {
|
||||
switch (column.jdbcType()) {
|
||||
case Types.BIT:
|
||||
return convertBits(column, fieldDefn);
|
||||
case Types.TINYINT:
|
||||
if (column.length() == 1) {
|
||||
return data -> convertBit(column, fieldDefn, data);
|
||||
}
|
||||
if (isUnsignedColumn(column)) {
|
||||
return data -> convertSmallInt(column, fieldDefn, data);
|
||||
}
|
||||
return data -> convertTinyInt(column, fieldDefn, data);
|
||||
case Types.SMALLINT:
|
||||
if (isUnsignedColumn(column)) {
|
||||
return data -> convertInteger(column, fieldDefn, data);
|
||||
}
|
||||
return data -> convertSmallInt(column, fieldDefn, data);
|
||||
case Types.INTEGER:
|
||||
if (column.typeName().toUpperCase().startsWith("MEDIUMINT")) {
|
||||
return data -> convertInteger(column, fieldDefn, data);
|
||||
}
|
||||
if (isUnsignedColumn(column)) {
|
||||
return data -> convertBigInt(column, fieldDefn, data);
|
||||
}
|
||||
return data -> convertInteger(column, fieldDefn, data);
|
||||
case Types.BIGINT:
|
||||
if (isUnsignedColumn(column)) {
|
||||
switch (bigIntUnsignedMode) {
|
||||
case LONG:
|
||||
return (data) -> convertBigInt(column, fieldDefn, data);
|
||||
case PRECISE:
|
||||
return (data) -> convertUnsignedBigint(column, fieldDefn, data);
|
||||
}
|
||||
}
|
||||
return (data) -> convertBigInt(column, fieldDefn, data);
|
||||
case Types.FLOAT:
|
||||
return data -> convertDecimal(column, fieldDefn, data);
|
||||
case Types.NUMERIC:
|
||||
case Types.DECIMAL:
|
||||
if ("mysql".equalsIgnoreCase(compatibleMode)) {
|
||||
return data -> convertDecimal(column, fieldDefn, data);
|
||||
}
|
||||
return data -> convertNumeric(column, fieldDefn, data);
|
||||
case Types.REAL:
|
||||
return data -> convertReal(column, fieldDefn, data);
|
||||
case Types.DOUBLE:
|
||||
return data -> convertDouble(column, fieldDefn, data);
|
||||
case Types.DATE:
|
||||
if ("mysql".equalsIgnoreCase(compatibleMode)) {
|
||||
if (column.typeName().equalsIgnoreCase("YEAR")) {
|
||||
return (data) -> convertYearToInt(column, fieldDefn, data);
|
||||
}
|
||||
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
|
||||
return (data) -> convertDateToEpochDays(column, fieldDefn, data);
|
||||
}
|
||||
return (data) -> convertDateToEpochDaysAsDate(column, fieldDefn, data);
|
||||
}
|
||||
return (data) -> convertTimestamp(column, fieldDefn, data);
|
||||
case Types.TIME:
|
||||
return (data) -> convertTime(column, fieldDefn, data);
|
||||
case Types.TIMESTAMP:
|
||||
return data -> convertTimestamp(column, fieldDefn, data);
|
||||
case Types.CHAR:
|
||||
case Types.VARCHAR:
|
||||
case Types.LONGVARCHAR:
|
||||
case Types.NCHAR:
|
||||
case Types.NVARCHAR:
|
||||
case Types.CLOB:
|
||||
return data -> convertString(column, fieldDefn, data);
|
||||
case Types.BINARY:
|
||||
case Types.VARBINARY:
|
||||
case Types.LONGVARBINARY:
|
||||
case Types.BLOB:
|
||||
return (data) -> convertBinary(column, fieldDefn, data, binaryMode);
|
||||
default:
|
||||
return super.converter(column, fieldDefn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertBits(Column column, Field fieldDefn, Object data, int numBytes) {
|
||||
if (data instanceof String) {
|
||||
return ByteBuffer.allocate(numBytes).putLong(Long.parseLong((String) data)).array();
|
||||
}
|
||||
return super.convertBits(column, fieldDefn, data, numBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertBit(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
return Boolean.parseBoolean((String) data) || "1".equals(data);
|
||||
}
|
||||
return super.convertBit(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertTinyInt(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
return Byte.parseByte((String) data);
|
||||
}
|
||||
if (data instanceof Number) {
|
||||
return ((Number) data).byteValue();
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"Unexpected value for JDBC type "
|
||||
+ column.jdbcType()
|
||||
+ " and column "
|
||||
+ column
|
||||
+ ": class="
|
||||
+ data.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertBigInt(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
return new BigInteger((String) data).longValue();
|
||||
}
|
||||
return super.convertBigInt(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
protected Object convertUnsignedBigint(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
return new BigDecimal((String) data);
|
||||
}
|
||||
if (data instanceof BigInteger) {
|
||||
return new BigDecimal((BigInteger) data);
|
||||
}
|
||||
return convertDecimal(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertReal(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
return Float.parseFloat((String) data);
|
||||
}
|
||||
return super.convertReal(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertDouble(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
return Double.parseDouble((String) data);
|
||||
}
|
||||
return super.convertDouble(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertNumeric(Column column, Field fieldDefn, Object data) {
|
||||
if (column.scale().isPresent()) {
|
||||
int scale = column.scale().get();
|
||||
|
||||
if (scale <= 0) {
|
||||
int width = column.length() - scale;
|
||||
if (width < 3) {
|
||||
return convertTinyInt(column, fieldDefn, data);
|
||||
} else if (width < 5) {
|
||||
return convertSmallInt(column, fieldDefn, data);
|
||||
} else if (width < 10) {
|
||||
return convertInteger(column, fieldDefn, data);
|
||||
} else if (width < 19) {
|
||||
return convertBigInt(column, fieldDefn, data);
|
||||
}
|
||||
}
|
||||
}
|
||||
return convertDecimal(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
protected Object convertYearToInt(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof Date) {
|
||||
return ((Date) data).toLocalDate().getYear();
|
||||
}
|
||||
return convertInteger(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertDateToEpochDays(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
data = Date.valueOf((String) data);
|
||||
}
|
||||
return super.convertDateToEpochDays(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertDateToEpochDaysAsDate(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
data = Date.valueOf((String) data);
|
||||
}
|
||||
return super.convertDateToEpochDaysAsDate(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertTime(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
data = Time.valueOf((String) data);
|
||||
}
|
||||
return super.convertTime(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
protected Object convertTimestamp(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof String) {
|
||||
if ("mysql".equalsIgnoreCase(compatibleMode)) {
|
||||
data = Timestamp.valueOf(((String) data).trim());
|
||||
} else {
|
||||
data = resolveTimestampStringAsInstant((String) data);
|
||||
}
|
||||
}
|
||||
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
|
||||
if (getTimePrecision(column) <= 3) {
|
||||
return convertTimestampToEpochMillis(column, fieldDefn, data);
|
||||
}
|
||||
if (getTimePrecision(column) <= 6) {
|
||||
return convertTimestampToEpochMicros(column, fieldDefn, data);
|
||||
}
|
||||
return convertTimestampToEpochNanos(column, fieldDefn, data);
|
||||
}
|
||||
return convertTimestampToEpochMillisAsDate(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
protected Instant resolveTimestampStringAsInstant(String dateText) {
|
||||
LocalDateTime dateTime;
|
||||
if (dateText.indexOf(" AM") > 0 || dateText.indexOf(" PM") > 0) {
|
||||
dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(dateText.trim()));
|
||||
} else {
|
||||
dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(dateText.trim()));
|
||||
}
|
||||
return dateTime.atZone(ZoneId.of(serverTimezone)).toInstant();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertString(Column column, Field fieldDefn, Object data) {
|
||||
if (data instanceof Clob) {
|
||||
try {
|
||||
Clob clob = (Clob) data;
|
||||
return clob.getSubString(1, (int) clob.length());
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Couldn't convert value for column " + column.name(), e);
|
||||
}
|
||||
}
|
||||
if (data instanceof String) {
|
||||
String s = (String) data;
|
||||
if (EMPTY_CLOB_FUNCTION.equals(s)) {
|
||||
return column.isOptional() ? null : "";
|
||||
}
|
||||
}
|
||||
return super.convertString(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertBinary(
|
||||
Column column,
|
||||
Field fieldDefn,
|
||||
Object data,
|
||||
CommonConnectorConfig.BinaryHandlingMode mode) {
|
||||
try {
|
||||
if (data instanceof Blob) {
|
||||
Blob blob = (Blob) data;
|
||||
data = blob.getBytes(1, Long.valueOf(blob.length()).intValue());
|
||||
}
|
||||
if (data instanceof String) {
|
||||
String str = (String) data;
|
||||
if (EMPTY_BLOB_FUNCTION.equals(str)) {
|
||||
data = column.isOptional() ? null : "";
|
||||
}
|
||||
}
|
||||
return super.convertBinary(column, fieldDefn, data, mode);
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Couldn't convert value for column " + column.name(), e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.offset;
|
||||
|
||||
import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
|
||||
|
||||
import io.debezium.connector.common.BaseSourceInfo;
|
||||
import io.debezium.relational.TableId;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/** OceanBase source info. */
|
||||
public class OceanBaseSourceInfo extends BaseSourceInfo {
|
||||
|
||||
public static final String TENANT_KEY = "tenant";
|
||||
public static final String TRANSACTION_ID_KEY = "transaction_id";
|
||||
|
||||
private final String tenant;
|
||||
|
||||
private Instant sourceTime;
|
||||
private Set<TableId> tableIds;
|
||||
private String transactionId;
|
||||
|
||||
public OceanBaseSourceInfo(OceanBaseConnectorConfig config, String tenant) {
|
||||
super(config);
|
||||
this.tenant = tenant;
|
||||
}
|
||||
|
||||
public String tenant() {
|
||||
return tenant;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Instant timestamp() {
|
||||
return sourceTime;
|
||||
}
|
||||
|
||||
public void setSourceTime(Instant sourceTime) {
|
||||
this.sourceTime = sourceTime;
|
||||
}
|
||||
|
||||
public void beginTransaction(String transactionId) {
|
||||
this.transactionId = transactionId;
|
||||
}
|
||||
|
||||
public void commitTransaction() {
|
||||
this.transactionId = null;
|
||||
}
|
||||
|
||||
public String transactionId() {
|
||||
return transactionId;
|
||||
}
|
||||
|
||||
public void tableEvent(TableId tableId) {
|
||||
this.tableIds = Collections.singleton(tableId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String database() {
|
||||
return (tableIds != null) ? tableIds.iterator().next().catalog() : null;
|
||||
}
|
||||
|
||||
public String tableSchema() {
|
||||
return (tableIds == null || tableIds.isEmpty())
|
||||
? null
|
||||
: tableIds.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(TableId::schema)
|
||||
.filter(Objects::nonNull)
|
||||
.distinct()
|
||||
.collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
public String table() {
|
||||
return (tableIds == null || tableIds.isEmpty())
|
||||
? null
|
||||
: tableIds.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(TableId::table)
|
||||
.collect(Collectors.joining(","));
|
||||
}
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.offset;
|
||||
|
||||
import io.debezium.connector.SourceInfoStructMaker;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
/** The {@link SourceInfoStructMaker} implementation for OceanBase. */
|
||||
public class OceanBaseSourceInfoStructMaker implements SourceInfoStructMaker<OceanBaseSourceInfo> {
|
||||
private final Schema schema;
|
||||
|
||||
public OceanBaseSourceInfoStructMaker() {
|
||||
this.schema =
|
||||
SchemaBuilder.struct()
|
||||
.field(OceanBaseSourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
|
||||
.field(OceanBaseSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
|
||||
.field(OceanBaseSourceInfo.TENANT_KEY, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field(OceanBaseSourceInfo.DATABASE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field(OceanBaseSourceInfo.SCHEMA_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field(
|
||||
OceanBaseSourceInfo.TRANSACTION_ID_KEY,
|
||||
Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema schema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct struct(OceanBaseSourceInfo sourceInfo) {
|
||||
Struct source = new Struct(schema);
|
||||
source.put(OceanBaseSourceInfo.TABLE_NAME_KEY, sourceInfo.table());
|
||||
|
||||
Instant timestamp = sourceInfo.timestamp();
|
||||
source.put(
|
||||
OceanBaseSourceInfo.TIMESTAMP_KEY,
|
||||
timestamp != null ? timestamp.toEpochMilli() : 0);
|
||||
|
||||
if (sourceInfo.tenant() != null) {
|
||||
source.put(OceanBaseSourceInfo.TENANT_KEY, sourceInfo.tenant());
|
||||
}
|
||||
if (sourceInfo.database() != null) {
|
||||
source.put(OceanBaseSourceInfo.DATABASE_NAME_KEY, sourceInfo.database());
|
||||
}
|
||||
if (sourceInfo.tableSchema() != null) {
|
||||
source.put(OceanBaseSourceInfo.SCHEMA_NAME_KEY, sourceInfo.tableSchema());
|
||||
}
|
||||
if (sourceInfo.transactionId() != null) {
|
||||
source.put(OceanBaseSourceInfo.TRANSACTION_ID_KEY, sourceInfo.transactionId());
|
||||
}
|
||||
return source;
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.schema;
|
||||
|
||||
import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
|
||||
import org.apache.flink.cdc.connectors.oceanbase.source.converter.OceanBaseValueConverters;
|
||||
|
||||
import io.debezium.relational.RelationalDatabaseSchema;
|
||||
import io.debezium.relational.TableSchemaBuilder;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.schema.TopicSelector;
|
||||
|
||||
/** OceanBase database schema. */
|
||||
public class OceanBaseDatabaseSchema extends RelationalDatabaseSchema {
|
||||
|
||||
public OceanBaseDatabaseSchema(
|
||||
OceanBaseConnectorConfig connectorConfig,
|
||||
Tables.TableFilter tableFilter,
|
||||
boolean tableIdCaseInsensitive) {
|
||||
super(
|
||||
connectorConfig,
|
||||
TopicSelector.defaultSelector(
|
||||
connectorConfig,
|
||||
(tableId, prefix, delimiter) ->
|
||||
String.join(delimiter, prefix, tableId.identifier())),
|
||||
tableFilter,
|
||||
connectorConfig.getColumnFilter(),
|
||||
new TableSchemaBuilder(
|
||||
new OceanBaseValueConverters(connectorConfig),
|
||||
connectorConfig.schemaNameAdjustmentMode().createAdjuster(),
|
||||
connectorConfig.customConverterRegistry(),
|
||||
connectorConfig.getSourceInfoStructMaker().schema(),
|
||||
connectorConfig.getSanitizeFieldNames(),
|
||||
false),
|
||||
tableIdCaseInsensitive,
|
||||
connectorConfig.getKeyMapper());
|
||||
}
|
||||
}
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.source.schema;
|
||||
|
||||
import org.apache.flink.cdc.connectors.oceanbase.source.connection.OceanBaseConnection;
|
||||
import org.apache.flink.util.FlinkRuntimeException;
|
||||
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.history.TableChanges;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/** A component used to get schema by table path. */
|
||||
public class OceanBaseSchema {
|
||||
|
||||
private final Map<TableId, TableChanges.TableChange> schemasByTableId;
|
||||
|
||||
public OceanBaseSchema() {
|
||||
this.schemasByTableId = new HashMap<>();
|
||||
}
|
||||
|
||||
public TableChanges.TableChange getTableSchema(JdbcConnection connection, TableId tableId) {
|
||||
TableChanges.TableChange schema = schemasByTableId.get(tableId);
|
||||
if (schema == null) {
|
||||
schema = readTableSchema(connection, tableId);
|
||||
schemasByTableId.put(tableId, schema);
|
||||
}
|
||||
return schema;
|
||||
}
|
||||
|
||||
private TableChanges.TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
|
||||
OceanBaseConnection connection = (OceanBaseConnection) jdbc;
|
||||
Set<TableId> tableIdSet = new HashSet<>();
|
||||
tableIdSet.add(tableId);
|
||||
|
||||
final Map<TableId, TableChanges.TableChange> tableChangeMap = new HashMap<>();
|
||||
Tables tables = new Tables();
|
||||
tables.overwriteTable(tables.editOrCreateTable(tableId).create());
|
||||
|
||||
try {
|
||||
connection.readSchemaForCapturedTables(
|
||||
tables, tableId.catalog(), tableId.schema(), null, false, tableIdSet);
|
||||
Table table = tables.forTable(tableId);
|
||||
TableChanges.TableChange tableChange =
|
||||
new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
|
||||
tableChangeMap.put(tableId, tableChange);
|
||||
} catch (SQLException e) {
|
||||
throw new FlinkRuntimeException(
|
||||
String.format("Failed to read schema for table %s ", tableId), e);
|
||||
}
|
||||
|
||||
if (!tableChangeMap.containsKey(tableId)) {
|
||||
throw new FlinkRuntimeException(
|
||||
String.format("Can't obtain schema for table %s ", tableId));
|
||||
}
|
||||
|
||||
return tableChangeMap.get(tableId);
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.table;
|
||||
|
||||
import org.apache.flink.cdc.common.annotation.Internal;
|
||||
import org.apache.flink.table.data.GenericRowData;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.data.utils.JoinedRowData;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/** Emits a row with physical fields and metadata fields. */
|
||||
@Internal
|
||||
public class OceanBaseAppendMetadataCollector implements Collector<RowData>, Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final OceanBaseMetadataConverter[] metadataConverters;
|
||||
|
||||
public transient OceanBaseRecord inputRecord;
|
||||
public transient Collector<RowData> outputCollector;
|
||||
|
||||
public OceanBaseAppendMetadataCollector(OceanBaseMetadataConverter[] metadataConverters) {
|
||||
this.metadataConverters = metadataConverters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(RowData physicalRow) {
|
||||
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
|
||||
for (int i = 0; i < metadataConverters.length; i++) {
|
||||
Object meta = metadataConverters[i].read(inputRecord);
|
||||
metaRow.setField(i, meta);
|
||||
}
|
||||
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
|
||||
outputCollector.collect(outRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// nothing to do
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.table;
|
||||
|
||||
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
|
||||
import org.apache.flink.cdc.common.annotation.PublicEvolving;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* The deserialization schema describes how to turn the OceanBase record into data types (Java/Scala
|
||||
* objects) that are processed by Flink.
|
||||
*
|
||||
* @param <T> The type created by the deserialization schema.
|
||||
*/
|
||||
@PublicEvolving
|
||||
public interface OceanBaseDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
|
||||
|
||||
/** Deserialize the OceanBase record, it is represented in {@link OceanBaseRecord}. */
|
||||
void deserialize(OceanBaseRecord record, Collector<T> out) throws Exception;
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.table;
|
||||
|
||||
import org.apache.flink.cdc.common.annotation.Internal;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/** A converter converts OceanBase record metadata into Flink internal data structures. */
|
||||
@FunctionalInterface
|
||||
@Internal
|
||||
public interface OceanBaseMetadataConverter extends Serializable {
|
||||
Object read(OceanBaseRecord record);
|
||||
}
|
@ -1,133 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.table;
|
||||
|
||||
import com.oceanbase.oms.logmessage.DataMessage;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/** An internal data structure representing record of OceanBase. */
|
||||
public class OceanBaseRecord implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final SourceInfo sourceInfo;
|
||||
private final boolean isSnapshotRecord;
|
||||
private final Map<String, Object> jdbcFields;
|
||||
private final DataMessage.Record.Type opt;
|
||||
private final Map<String, Object> logMessageFieldsBefore;
|
||||
private final Map<String, Object> logMessageFieldsAfter;
|
||||
|
||||
public OceanBaseRecord(SourceInfo sourceInfo, Map<String, Object> jdbcFields) {
|
||||
this.sourceInfo = sourceInfo;
|
||||
this.isSnapshotRecord = true;
|
||||
this.jdbcFields = jdbcFields;
|
||||
this.opt = null;
|
||||
this.logMessageFieldsBefore = null;
|
||||
this.logMessageFieldsAfter = null;
|
||||
}
|
||||
|
||||
public OceanBaseRecord(
|
||||
SourceInfo sourceInfo,
|
||||
DataMessage.Record.Type opt,
|
||||
List<DataMessage.Record.Field> logMessageFieldList) {
|
||||
this.sourceInfo = sourceInfo;
|
||||
this.isSnapshotRecord = false;
|
||||
this.jdbcFields = null;
|
||||
this.opt = opt;
|
||||
this.logMessageFieldsBefore = new HashMap<>();
|
||||
this.logMessageFieldsAfter = new HashMap<>();
|
||||
for (DataMessage.Record.Field field : logMessageFieldList) {
|
||||
if (field.isPrev()) {
|
||||
logMessageFieldsBefore.put(field.getFieldname(), getFieldStringValue(field));
|
||||
} else {
|
||||
logMessageFieldsAfter.put(field.getFieldname(), getFieldStringValue(field));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getFieldStringValue(DataMessage.Record.Field field) {
|
||||
if (field.getValue() == null) {
|
||||
return null;
|
||||
}
|
||||
String encoding = field.getEncoding();
|
||||
if ("binary".equalsIgnoreCase(encoding)) {
|
||||
return field.getValue().toString("utf8");
|
||||
}
|
||||
return field.getValue().toString(encoding);
|
||||
}
|
||||
|
||||
public SourceInfo getSourceInfo() {
|
||||
return sourceInfo;
|
||||
}
|
||||
|
||||
public boolean isSnapshotRecord() {
|
||||
return isSnapshotRecord;
|
||||
}
|
||||
|
||||
public Map<String, Object> getJdbcFields() {
|
||||
return jdbcFields;
|
||||
}
|
||||
|
||||
public DataMessage.Record.Type getOpt() {
|
||||
return opt;
|
||||
}
|
||||
|
||||
public Map<String, Object> getLogMessageFieldsBefore() {
|
||||
return logMessageFieldsBefore;
|
||||
}
|
||||
|
||||
public Map<String, Object> getLogMessageFieldsAfter() {
|
||||
return logMessageFieldsAfter;
|
||||
}
|
||||
|
||||
/** Information about the source of record. */
|
||||
public static class SourceInfo implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final String tenant;
|
||||
private final String database;
|
||||
private final String table;
|
||||
private final long timestampS;
|
||||
|
||||
public SourceInfo(String tenant, String database, String table, long timestampS) {
|
||||
this.tenant = tenant;
|
||||
this.database = database;
|
||||
this.table = table;
|
||||
this.timestampS = timestampS;
|
||||
}
|
||||
|
||||
public String getTenant() {
|
||||
return tenant;
|
||||
}
|
||||
|
||||
public String getDatabase() {
|
||||
return database;
|
||||
}
|
||||
|
||||
public String getTable() {
|
||||
return table;
|
||||
}
|
||||
|
||||
public long getTimestampS() {
|
||||
return timestampS;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.table;
|
||||
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
|
||||
/** Startup modes for the OceanBase CDC Consumer. */
|
||||
public enum StartupMode {
|
||||
/**
|
||||
* Performs an initial snapshot on the monitored database tables upon first startup, and
|
||||
* continue to read the commit log.
|
||||
*/
|
||||
INITIAL,
|
||||
|
||||
/**
|
||||
* Never to perform snapshot on the monitored database tables upon first startup, just read from
|
||||
* the end of the commit log which means only have the changes since the connector was started.
|
||||
*/
|
||||
LATEST_OFFSET,
|
||||
|
||||
/**
|
||||
* Never to perform snapshot on the monitored database tables upon first startup, and directly
|
||||
* read commit log from the specified timestamp.
|
||||
*/
|
||||
TIMESTAMP;
|
||||
|
||||
public static StartupMode getStartupMode(String modeString) {
|
||||
switch (modeString.toLowerCase()) {
|
||||
case "initial":
|
||||
return INITIAL;
|
||||
case "latest-offset":
|
||||
return LATEST_OFFSET;
|
||||
case "timestamp":
|
||||
return TIMESTAMP;
|
||||
default:
|
||||
throw new ValidationException(
|
||||
String.format("Invalid startup mode '%s'.", modeString));
|
||||
}
|
||||
}
|
||||
}
|
16
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java → flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseUtils.java
16
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java → flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseUtils.java
446
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java → flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
446
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java → flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
@ -0,0 +1,267 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.cdc.connectors.oceanbase.table;
|
||||
|
||||
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.TableResult;
|
||||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/** Integration tests for OceanBase Oracle mode table source. */
|
||||
@Ignore("Test ignored before oceanbase-xe docker image is available")
|
||||
@RunWith(Parameterized.class)
|
||||
public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
|
||||
|
||||
private final StreamExecutionEnvironment env =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
private final StreamTableEnvironment tEnv =
|
||||
StreamTableEnvironment.create(
|
||||
env, EnvironmentSettings.newInstance().inStreamingMode().build());
|
||||
|
||||
private final String schema;
|
||||
private final String configUrl;
|
||||
|
||||
public OceanBaseOracleModeITCase(
|
||||
String username,
|
||||
String password,
|
||||
String hostname,
|
||||
int port,
|
||||
String logProxyHost,
|
||||
int logProxyPort,
|
||||
String tenant,
|
||||
String schema,
|
||||
String configUrl) {
|
||||
super("oracle", username, password, hostname, port, logProxyHost, logProxyPort, tenant);
|
||||
this.schema = schema;
|
||||
this.configUrl = configUrl;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static List<Object[]> parameters() {
|
||||
return Collections.singletonList(
|
||||
new Object[] {
|
||||
"SYS@test",
|
||||
"123456",
|
||||
"127.0.0.1",
|
||||
2881,
|
||||
"127.0.0.1",
|
||||
2983,
|
||||
"test",
|
||||
"SYS",
|
||||
"http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster"
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String commonOptionsString() {
|
||||
return super.commonOptionsString() + " , " + " 'jdbc.driver' = 'com.oceanbase.jdbc.Driver'";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String logProxyOptionsString() {
|
||||
return super.logProxyOptionsString()
|
||||
+ " , "
|
||||
+ String.format(" 'config-url' = '%s'", configUrl);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getJdbcConnection() throws SQLException {
|
||||
return DriverManager.getConnection(
|
||||
"jdbc:oceanbase://" + hostname + ":" + port + "/" + schema, username, password);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllDataTypes() throws Exception {
|
||||
initializeTable("column_type_test");
|
||||
String sourceDDL =
|
||||
String.format(
|
||||
"CREATE TABLE full_types ("
|
||||
+ " ID INT NOT NULL,"
|
||||
+ " VAL_VARCHAR STRING,"
|
||||
+ " VAL_VARCHAR2 STRING,"
|
||||
+ " VAL_NVARCHAR2 STRING,"
|
||||
+ " VAL_CHAR STRING,"
|
||||
+ " VAL_NCHAR STRING,"
|
||||
+ " VAL_BF FLOAT,"
|
||||
+ " VAL_BD DOUBLE,"
|
||||
+ " VAL_F FLOAT,"
|
||||
+ " VAL_F_10 FLOAT,"
|
||||
+ " VAL_NUM DECIMAL(10, 6),"
|
||||
+ " VAL_DP DOUBLE,"
|
||||
+ " VAL_R DECIMAL(38,2),"
|
||||
+ " VAL_DECIMAL DECIMAL(10, 6),"
|
||||
+ " VAL_NUMERIC DECIMAL(10, 6),"
|
||||
+ " VAL_NUM_VS DECIMAL(10, 3),"
|
||||
+ " VAL_INT DECIMAL(38,0),"
|
||||
+ " VAL_INTEGER DECIMAL(38,0),"
|
||||
+ " VAL_SMALLINT DECIMAL(38,0),"
|
||||
+ " VAL_NUMBER_38_NO_SCALE DECIMAL(38,0),"
|
||||
+ " VAL_NUMBER_38_SCALE_0 DECIMAL(38,0),"
|
||||
+ " VAL_NUMBER_1 BOOLEAN,"
|
||||
+ " VAL_NUMBER_2 TINYINT,"
|
||||
+ " VAL_NUMBER_4 SMALLINT,"
|
||||
+ " VAL_NUMBER_9 INT,"
|
||||
+ " VAL_NUMBER_18 BIGINT,"
|
||||
+ " VAL_NUMBER_2_NEGATIVE_SCALE TINYINT,"
|
||||
+ " VAL_NUMBER_4_NEGATIVE_SCALE SMALLINT,"
|
||||
+ " VAL_NUMBER_9_NEGATIVE_SCALE INT,"
|
||||
+ " VAL_NUMBER_18_NEGATIVE_SCALE BIGINT,"
|
||||
+ " VAL_NUMBER_36_NEGATIVE_SCALE DECIMAL(38,0),"
|
||||
+ " VAL_DATE TIMESTAMP,"
|
||||
+ " VAL_TS TIMESTAMP,"
|
||||
+ " VAL_TS_PRECISION2 TIMESTAMP(2),"
|
||||
+ " VAL_TS_PRECISION4 TIMESTAMP(4),"
|
||||
+ " VAL_TS_PRECISION9 TIMESTAMP(6),"
|
||||
+ " VAL_CLOB_INLINE STRING,"
|
||||
+ " VAL_BLOB_INLINE BYTES,"
|
||||
+ " PRIMARY KEY (ID) NOT ENFORCED"
|
||||
+ ") WITH ("
|
||||
+ initialOptionsString()
|
||||
+ ", "
|
||||
+ " 'table-list' = '%s'"
|
||||
+ ")",
|
||||
schema + ".FULL_TYPES");
|
||||
|
||||
String sinkDDL =
|
||||
"CREATE TABLE sink ("
|
||||
+ " ID INT,"
|
||||
+ " VAL_VARCHAR STRING,"
|
||||
+ " VAL_VARCHAR2 STRING,"
|
||||
+ " VAL_NVARCHAR2 STRING,"
|
||||
+ " VAL_CHAR STRING,"
|
||||
+ " VAL_NCHAR STRING,"
|
||||
+ " VAL_BF FLOAT,"
|
||||
+ " VAL_BD DOUBLE,"
|
||||
+ " VAL_F FLOAT,"
|
||||
+ " VAL_F_10 FLOAT,"
|
||||
+ " VAL_NUM DECIMAL(10, 6),"
|
||||
+ " VAL_DP DOUBLE,"
|
||||
+ " VAL_R DECIMAL(38,2),"
|
||||
+ " VAL_DECIMAL DECIMAL(10, 6),"
|
||||
+ " VAL_NUMERIC DECIMAL(10, 6),"
|
||||
+ " VAL_NUM_VS DECIMAL(10, 3),"
|
||||
+ " VAL_INT DECIMAL(38,0),"
|
||||
+ " VAL_INTEGER DECIMAL(38,0),"
|
||||
+ " VAL_SMALLINT DECIMAL(38,0),"
|
||||
+ " VAL_NUMBER_38_NO_SCALE DECIMAL(38,0),"
|
||||
+ " VAL_NUMBER_38_SCALE_0 DECIMAL(38,0),"
|
||||
+ " VAL_NUMBER_1 BOOLEAN,"
|
||||
+ " VAL_NUMBER_2 TINYINT,"
|
||||
+ " VAL_NUMBER_4 SMALLINT,"
|
||||
+ " VAL_NUMBER_9 INT,"
|
||||
+ " VAL_NUMBER_18 BIGINT,"
|
||||
+ " VAL_NUMBER_2_NEGATIVE_SCALE TINYINT,"
|
||||
+ " VAL_NUMBER_4_NEGATIVE_SCALE SMALLINT,"
|
||||
+ " VAL_NUMBER_9_NEGATIVE_SCALE INT,"
|
||||
+ " VAL_NUMBER_18_NEGATIVE_SCALE BIGINT,"
|
||||
+ " VAL_NUMBER_36_NEGATIVE_SCALE DECIMAL(38,0),"
|
||||
+ " VAL_DATE TIMESTAMP,"
|
||||
+ " VAL_TS TIMESTAMP,"
|
||||
+ " VAL_TS_PRECISION2 TIMESTAMP(2),"
|
||||
+ " VAL_TS_PRECISION4 TIMESTAMP(4),"
|
||||
+ " VAL_TS_PRECISION9 TIMESTAMP(6),"
|
||||
+ " VAL_CLOB_INLINE STRING,"
|
||||
+ " VAL_BLOB_INLINE STRING,"
|
||||
+ " PRIMARY KEY (ID) NOT ENFORCED"
|
||||
+ ") WITH ("
|
||||
+ " 'connector' = 'values',"
|
||||
+ " 'sink-insert-only' = 'false',"
|
||||
+ " 'sink-expected-messages-num' = '2'"
|
||||
+ ")";
|
||||
|
||||
tEnv.executeSql(sourceDDL);
|
||||
tEnv.executeSql(sinkDDL);
|
||||
|
||||
TableResult result =
|
||||
tEnv.executeSql(
|
||||
"INSERT INTO sink SELECT "
|
||||
+ " ID,"
|
||||
+ " VAL_VARCHAR,"
|
||||
+ " VAL_VARCHAR2,"
|
||||
+ " VAL_NVARCHAR2,"
|
||||
+ " VAL_CHAR,"
|
||||
+ " VAL_NCHAR,"
|
||||
+ " VAL_BF,"
|
||||
+ " VAL_BD,"
|
||||
+ " VAL_F,"
|
||||
+ " VAL_F_10,"
|
||||
+ " VAL_NUM,"
|
||||
+ " VAL_DP,"
|
||||
+ " VAL_R,"
|
||||
+ " VAL_DECIMAL,"
|
||||
+ " VAL_NUMERIC,"
|
||||
+ " VAL_NUM_VS,"
|
||||
+ " VAL_INT,"
|
||||
+ " VAL_INTEGER,"
|
||||
+ " VAL_SMALLINT,"
|
||||
+ " VAL_NUMBER_38_NO_SCALE,"
|
||||
+ " VAL_NUMBER_38_SCALE_0,"
|
||||
+ " VAL_NUMBER_1,"
|
||||
+ " VAL_NUMBER_2,"
|
||||
+ " VAL_NUMBER_4,"
|
||||
+ " VAL_NUMBER_9,"
|
||||
+ " VAL_NUMBER_18,"
|
||||
+ " VAL_NUMBER_2_NEGATIVE_SCALE,"
|
||||
+ " VAL_NUMBER_4_NEGATIVE_SCALE,"
|
||||
+ " VAL_NUMBER_9_NEGATIVE_SCALE,"
|
||||
+ " VAL_NUMBER_18_NEGATIVE_SCALE,"
|
||||
+ " VAL_NUMBER_36_NEGATIVE_SCALE,"
|
||||
+ " VAL_DATE,"
|
||||
+ " VAL_TS,"
|
||||
+ " VAL_TS_PRECISION2,"
|
||||
+ " VAL_TS_PRECISION4,"
|
||||
+ " VAL_TS_PRECISION9,"
|
||||
+ " VAL_CLOB_INLINE,"
|
||||
+ " DECODE(VAL_BLOB_INLINE, 'UTF-8')"
|
||||
+ " FROM full_types");
|
||||
|
||||
waitForSinkSize("sink", 1);
|
||||
|
||||
try (Connection connection = getJdbcConnection();
|
||||
Statement statement = connection.createStatement()) {
|
||||
statement.execute(
|
||||
"UPDATE FULL_TYPES SET VAL_TS = '2022-10-30 12:34:56.12545' WHERE id=1;");
|
||||
}
|
||||
|
||||
waitForSinkSize("sink", 2);
|
||||
|
||||
List<String> expected =
|
||||
Arrays.asList(
|
||||
"+I(1,vc2,vc2,nvc2,c ,nc ,1.1,2.22,3.33,8.888,4.444400,5.555,6.66,1234.567891,1234.567891,77.323,1,22,333,4444,5555,true,99,9999,999999999,999999999999999999,90,9900,999999990,999999999999999900,99999999999999999999999999999999999900,2022-10-30T00:00,2022-10-30T12:34:56.007890,2022-10-30T12:34:56.130,2022-10-30T12:34:56.125500,2022-10-30T12:34:56.125457,col_clob,col_blob)",
|
||||
"+U(1,vc2,vc2,nvc2,c ,nc ,1.1,2.22,3.33,8.888,4.444400,5.555,6.66,1234.567891,1234.567891,77.323,1,22,333,4444,5555,true,99,9999,999999999,999999999999999999,90,9900,999999990,999999999999999900,99999999999999999999999999999999999900,2022-10-30T00:00,2022-10-30T12:34:56.125450,2022-10-30T12:34:56.130,2022-10-30T12:34:56.125500,2022-10-30T12:34:56.125457,col_clob,col_blob)");
|
||||
|
||||
List<String> actual = TestValuesTableFactory.getRawResults("sink");
|
||||
assertContainsInAnyOrder(expected, actual);
|
||||
result.getJobClient().get().cancel().get();
|
||||
}
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
-- contributor license agreements. See the NOTICE file distributed with
|
||||
-- this work for additional information regarding copyright ownership.
|
||||
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
-- (the "License"); you may not use this file except in compliance with
|
||||
-- the License. You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: inventory_meta
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
|
||||
CREATE DATABASE inventory_meta;
|
||||
USE inventory_meta;
|
||||
|
||||
-- Create and populate our products using a single insert with many rows
|
||||
CREATE TABLE products
|
||||
(
|
||||
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL DEFAULT 'flink',
|
||||
description VARCHAR(512),
|
||||
weight DECIMAL(20, 10)
|
||||
);
|
||||
ALTER TABLE products AUTO_INCREMENT = 101;
|
||||
|
||||
INSERT INTO products
|
||||
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
|
||||
(default, "car battery", "12V car battery", 8.1),
|
||||
(default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
|
||||
(default, "hammer", "12oz carpenter's hammer", 0.75),
|
||||
(default, "hammer", "14oz carpenter's hammer", 0.875),
|
||||
(default, "hammer", "16oz carpenter's hammer", 1.0),
|
||||
(default, "rocks", "box of assorted rocks", 5.3),
|
||||
(default, "jacket", "water resistent black wind breaker", 0.1),
|
||||
(default, "spare tire", "24 inch spare tire", 22.2);
|
@ -0,0 +1,17 @@
|
||||
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
-- contributor license agreements. See the NOTICE file distributed with
|
||||
-- this work for additional information regarding copyright ownership.
|
||||
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
-- (the "License"); you may not use this file except in compliance with
|
||||
-- the License. You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
-- Set the root user password of test tenant
|
||||
ALTER USER root IDENTIFIED BY '123456';
|
@ -0,0 +1,70 @@
|
||||
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
-- contributor license agreements. See the NOTICE file distributed with
|
||||
-- this work for additional information regarding copyright ownership.
|
||||
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
-- (the "License"); you may not use this file except in compliance with
|
||||
-- the License. You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
CREATE TABLE FULL_TYPES (
|
||||
ID NUMBER(9) NOT NULL,
|
||||
VAL_VARCHAR VARCHAR2(1000),
|
||||
VAL_VARCHAR2 VARCHAR2(1000),
|
||||
VAL_NVARCHAR2 NVARCHAR2(1000),
|
||||
VAL_CHAR CHAR(3),
|
||||
VAL_NCHAR NCHAR(3),
|
||||
VAL_BF BINARY_FLOAT,
|
||||
VAL_BD BINARY_DOUBLE,
|
||||
VAL_F FLOAT,
|
||||
VAL_F_10 FLOAT(10),
|
||||
VAL_NUM NUMBER(10, 6),
|
||||
VAL_DP FLOAT,
|
||||
VAL_R FLOAT(63),
|
||||
VAL_DECIMAL NUMBER(10, 6),
|
||||
VAL_NUMERIC NUMBER(10, 6),
|
||||
VAL_NUM_VS NUMBER,
|
||||
VAL_INT NUMBER,
|
||||
VAL_INTEGER NUMBER,
|
||||
VAL_SMALLINT NUMBER,
|
||||
VAL_NUMBER_38_NO_SCALE NUMBER(38),
|
||||
VAL_NUMBER_38_SCALE_0 NUMBER(38),
|
||||
VAL_NUMBER_1 NUMBER(1),
|
||||
VAL_NUMBER_2 NUMBER(2),
|
||||
VAL_NUMBER_4 NUMBER(4),
|
||||
VAL_NUMBER_9 NUMBER(9),
|
||||
VAL_NUMBER_18 NUMBER(18),
|
||||
VAL_NUMBER_2_NEGATIVE_SCALE NUMBER(1, -1),
|
||||
VAL_NUMBER_4_NEGATIVE_SCALE NUMBER(2, -2),
|
||||
VAL_NUMBER_9_NEGATIVE_SCALE NUMBER(8, -1),
|
||||
VAL_NUMBER_18_NEGATIVE_SCALE NUMBER(16, -2),
|
||||
VAL_NUMBER_36_NEGATIVE_SCALE NUMBER(36, -2),
|
||||
VAL_DATE DATE,
|
||||
VAL_TS TIMESTAMP(6),
|
||||
VAL_TS_PRECISION2 TIMESTAMP(2),
|
||||
VAL_TS_PRECISION4 TIMESTAMP(4),
|
||||
VAL_TS_PRECISION9 TIMESTAMP(6),
|
||||
VAL_CLOB_INLINE CLOB,
|
||||
VAL_BLOB_INLINE BLOB,
|
||||
primary key (ID)
|
||||
);
|
||||
|
||||
INSERT INTO FULL_TYPES VALUES (
|
||||
1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',
|
||||
1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,
|
||||
1, 22, 333, 4444, 5555, 1, 99, 9999, 999999999, 999999999999999999,
|
||||
94, 9949, 999999994, 999999999999999949, 99999999999999999999999999999999999949,
|
||||
TO_DATE('2022-10-30', 'yyyy-mm-dd'),
|
||||
TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5'),
|
||||
TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),
|
||||
TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd HH24:MI:SS.FF5'),
|
||||
TO_TIMESTAMP('2022-10-30 12:34:56.125456789', 'yyyy-mm-dd HH24:MI:SS.FF9'),
|
||||
TO_CLOB ('col_clob'),
|
||||
utl_raw.cast_to_raw ('col_blob')
|
||||
);
|
Loading…
Reference in New Issue