From 228fbe650a35daa8cb2e03fabb0be0de1276b33f Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Sun, 6 Nov 2022 22:12:19 +0800 Subject: [PATCH] [oracle] Properly support TIMESTAMP_LTZ type for oracle cdc connector --- ...OracleDeserializationConverterFactory.java | 33 +++++++++++++++++-- .../oracle/table/OracleTableSource.java | 9 ++++- .../table/OracleTableSourceFactory.java | 6 +++- .../oracle/table/OracleConnectorITCase.java | 3 ++ 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java index bf7b73f75..362067476 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.oracle.table; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; @@ -26,6 +27,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import java.math.BigDecimal; +import java.time.Instant; import java.time.ZoneId; import java.util.Optional; @@ -40,14 +42,14 @@ public class OracleDeserializationConverterFactory { @Override public Optional createUserDefinedConverter( LogicalType logicalType, ZoneId serverTimeZone) { - return wrapNumericConverter(createNumericConverter(logicalType)); + return wrapNumericConverter(createNumericConverter(logicalType, serverTimeZone)); } }; } /** Creates a runtime converter which assuming input object is not null. */ private static Optional createNumericConverter( - LogicalType type) { + LogicalType type, ZoneId serverTimeZone) { switch (type.getTypeRoot()) { case BOOLEAN: return createBooleanConverter(); @@ -63,12 +65,39 @@ public class OracleDeserializationConverterFactory { return createFloatConverter(); case DOUBLE: return createDoubleConverter(); + // Debezium use io.debezium.time.ZonedTimestamp to map Oracle TIMESTAMP WITH LOCAL + // TIME ZONE type, the value is a string representation of a timestamp in UTC. + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToLocalTimeZoneTimestamp(); default: // fallback to default converter return Optional.empty(); } } + private static Optional convertToLocalTimeZoneTimestamp() { + return Optional.of( + new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // TIMESTAMP_LTZ type is encoded in string type + Instant instant = Instant.parse(str); + return TimestampData.fromInstant(instant); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); + } + }); + } + private static Optional wrapNumericConverter( Optional converterOptional) { return converterOptional.map( diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java index 855366872..bf033bb72 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -40,6 +40,7 @@ import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import javax.annotation.Nullable; +import java.time.ZoneId; import java.util.Collections; import java.util.List; import java.util.Map; @@ -277,7 +278,13 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada dbzProperties, startupOptions, producedDataType, - metadataKeys); + metadataKeys, + enableParallelRead, + splitSize, + fetchSize, + connectMaxRetries, + connectionPoolSize, + chunkKeyColumn); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java index f0e5e52a5..810478369 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -36,6 +36,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABA import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME; import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; @@ -76,6 +77,9 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + String chunkKeyColumn = + config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); + String serverTimezone = config.get(SERVER_TIME_ZONE); if (enableParallelRead) { validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); @@ -101,7 +105,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { fetchSize, connectMaxRetries, connectionPoolSize, - config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null)); + chunkKeyColumn); } @Override diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java index c12d4bb8c..d694d45ad 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -39,6 +39,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -646,6 +647,8 @@ public class OracleConnectorITCase extends AbstractTestBase { public void testAllDataTypes() throws Throwable { OracleTestUtils.createAndInitialize( OracleTestUtils.ORACLE_CONTAINER, "column_type_test.sql"); + + tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); String sourceDDL = String.format( "CREATE TABLE full_types ("