diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java index cf50d5b84..4828e95ea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java @@ -28,8 +28,6 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory import com.ververica.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; import com.ververica.cdc.debezium.table.DebeziumChangelogMode; -import java.time.ZoneId; - /** A {@link DataSource} for mysql cdc connector. */ @Internal public class MySqlDataSource implements DataSource { @@ -46,9 +44,7 @@ public class MySqlDataSource implements DataSource { public EventSourceProvider getEventSourceProvider() { MySqlEventDeserializer deserializer = new MySqlEventDeserializer( - DebeziumChangelogMode.ALL, - ZoneId.of(sourceConfig.getServerTimeZone()), - sourceConfig.isIncludeSchemaChanges()); + DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 867e9a926..ec80d14fe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -37,7 +37,6 @@ import org.apache.kafka.connect.source.SourceRecord; import java.io.IOException; import java.nio.ByteBuffer; -import java.time.ZoneId; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -63,10 +62,8 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private transient CustomMySqlAntlrDdlParser customParser; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, - ZoneId serverTimeZone, - boolean includeSchemaChanges) { - super(new MySqlSchemaDataTypeInference(), changelogMode, serverTimeZone); + DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java index b6f6ddb06..f32adf57c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -57,8 +57,6 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Collections; import java.util.List; import java.util.Map; @@ -83,16 +81,10 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve /** Changelog Mode to use for encoding changes in Flink internal data structure. */ protected final DebeziumChangelogMode changelogMode; - /** The session time zone in database server. */ - protected final ZoneId serverTimeZone; - public DebeziumEventDeserializationSchema( - SchemaDataTypeInference schemaDataTypeInference, - DebeziumChangelogMode changelogMode, - ZoneId serverTimeZone) { + SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) { this.schemaDataTypeInference = schemaDataTypeInference; this.changelogMode = changelogMode; - this.serverTimeZone = serverTimeZone; } @Override @@ -322,8 +314,11 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000)); } } - LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); - return TimestampData.fromLocalDateTime(localDateTime); + throw new IllegalArgumentException( + "Unable to convert to TIMESTAMP from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); } protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) { @@ -334,7 +329,7 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve return LocalZonedTimestampData.fromInstant(instant); } throw new IllegalArgumentException( - "Unable to convert to TimestampData from unexpected value '" + "Unable to convert to TIMESTAMP_LTZ from unexpected value '" + dbzObj + "' of type " + dbzObj.getClass().getName());