[oracle] Properly support TIMESTAMP_LTZ type for oracle cdc connector

pull/1590/merge
Leonard Xu 2 years ago committed by Leonard Xu
parent b4ce19951f
commit 228fbe650a

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.oracle.table;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
@ -26,6 +27,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Optional;
@ -40,14 +42,14 @@ public class OracleDeserializationConverterFactory {
@Override
public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
LogicalType logicalType, ZoneId serverTimeZone) {
return wrapNumericConverter(createNumericConverter(logicalType));
return wrapNumericConverter(createNumericConverter(logicalType, serverTimeZone));
}
};
}
/** Creates a runtime converter which assuming input object is not null. */
private static Optional<DeserializationRuntimeConverter> createNumericConverter(
LogicalType type) {
LogicalType type, ZoneId serverTimeZone) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return createBooleanConverter();
@ -63,12 +65,39 @@ public class OracleDeserializationConverterFactory {
return createFloatConverter();
case DOUBLE:
return createDoubleConverter();
// Debezium use io.debezium.time.ZonedTimestamp to map Oracle TIMESTAMP WITH LOCAL
// TIME ZONE type, the value is a string representation of a timestamp in UTC.
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp();
default:
// fallback to default converter
return Optional.empty();
}
}
private static Optional<DeserializationRuntimeConverter> convertToLocalTimeZoneTimestamp() {
return Optional.of(
new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromInstant(instant);
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
});
}
private static Optional<DeserializationRuntimeConverter> wrapNumericConverter(
Optional<DeserializationRuntimeConverter> converterOptional) {
return converterOptional.map(

@ -40,6 +40,7 @@ import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -277,7 +278,13 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
dbzProperties,
startupOptions,
producedDataType,
metadataKeys);
metadataKeys,
enableParallelRead,
splitSize,
fetchSize,
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn);
}
@Override

@ -36,6 +36,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABA
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@ -76,6 +77,9 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
String chunkKeyColumn =
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
String serverTimezone = config.get(SERVER_TIME_ZONE);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@ -101,7 +105,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
fetchSize,
connectMaxRetries,
connectionPoolSize,
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null));
chunkKeyColumn);
}
@Override

@ -39,6 +39,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -646,6 +647,8 @@ public class OracleConnectorITCase extends AbstractTestBase {
public void testAllDataTypes() throws Throwable {
OracleTestUtils.createAndInitialize(
OracleTestUtils.ORACLE_CONTAINER, "column_type_test.sql");
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
String sourceDDL =
String.format(
"CREATE TABLE full_types ("

Loading…
Cancel
Save