[pipeline-connector][mysql] Remove unnecessary serverTimeZone in DebeziumEventDeserializationSchema (#2816)

pull/2821/head
Jiabao Sun 1 year ago committed by GitHub
parent 18fb0d8b7f
commit 7d7c1af553
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -28,8 +28,6 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory
import com.ververica.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import java.time.ZoneId;
/** A {@link DataSource} for mysql cdc connector. */
@Internal
public class MySqlDataSource implements DataSource {
@ -46,9 +44,7 @@ public class MySqlDataSource implements DataSource {
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
ZoneId.of(sourceConfig.getServerTimeZone()),
sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());
MySqlSource<Event> source =
new MySqlSource<>(

@ -37,7 +37,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -63,10 +62,8 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private transient CustomMySqlAntlrDdlParser customParser;
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone,
boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode, serverTimeZone);
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
}

@ -57,8 +57,6 @@ import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -83,16 +81,10 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
protected final DebeziumChangelogMode changelogMode;
/** The session time zone in database server. */
protected final ZoneId serverTimeZone;
public DebeziumEventDeserializationSchema(
SchemaDataTypeInference schemaDataTypeInference,
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone) {
SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) {
this.schemaDataTypeInference = schemaDataTypeInference;
this.changelogMode = changelogMode;
this.serverTimeZone = serverTimeZone;
}
@Override
@ -322,8 +314,11 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
@ -334,7 +329,7 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
return LocalZonedTimestampData.fromInstant(instant);
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
"Unable to convert to TIMESTAMP_LTZ from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());

Loading…
Cancel
Save