From 19f7afe6af8e7fb0f1f5e090da4c701dfef52633 Mon Sep 17 00:00:00 2001 From: joyCurry30 <149778446+joyCurry30@users.noreply.github.com> Date: Mon, 3 Jun 2024 16:37:30 +0800 Subject: [PATCH] [FLINK-35430][cdc-connector][kafka] Pass the time zone info to JsonSerializationSchema This closes #3377. --- .../connectors/kafka/json/ChangeLogJsonFormatFactory.java | 6 +++++- .../kafka/json/canal/CanalJsonSerializationSchema.java | 3 ++- .../json/debezium/DebeziumJsonSerializationSchema.java | 3 ++- .../cdc/connectors/kafka/sink/KafkaDataSinkFactory.java | 2 +- .../kafka/json/canal/CanalJsonSerializationSchemaTest.java | 6 +++++- .../json/debezium/DebeziumJsonSerializationSchemaTest.java | 6 +++++- 6 files changed, 20 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java index 81c401804..cd2850316 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java @@ -27,6 +27,8 @@ import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptionsUtil; +import java.time.ZoneId; + import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; @@ -46,7 +48,7 @@ public class ChangeLogJsonFormatFactory { * @return The configured instance of {@link SerializationSchema}. */ public static SerializationSchema createSerializationSchema( - ReadableConfig formatOptions, JsonSerializationType type) { + ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) { TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); JsonFormatOptions.MapNullKeyMode mapNullKeyMode = JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions); @@ -62,6 +64,7 @@ public class ChangeLogJsonFormatFactory { timestampFormat, mapNullKeyMode, mapNullKeyLiteral, + zoneId, encodeDecimalAsPlainNumber); } case CANAL_JSON: @@ -70,6 +73,7 @@ public class ChangeLogJsonFormatFactory { timestampFormat, mapNullKeyMode, mapNullKeyLiteral, + zoneId, encodeDecimalAsPlainNumber); } default: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index eecd2b801..78548e31b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -81,12 +81,13 @@ public class CanalJsonSerializationSchema implements SerializationSchema TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, + ZoneId zoneId, boolean encodeDecimalAsPlainNumber) { this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; - this.zoneId = ZoneId.systemDefault(); + this.zoneId = zoneId; jsonSerializers = new HashMap<>(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java index 3fa1d4ce1..2f305ce42 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java @@ -80,12 +80,13 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java index 9b117243f..f993d6325 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java @@ -62,7 +62,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory { context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT); SerializationSchema valueSerialization = ChangeLogJsonFormatFactory.createSerializationSchema( - configuration, jsonSerializationType); + configuration, jsonSerializationType, zoneId); final Properties kafkaProperties = new Properties(); Map allOptions = context.getFactoryConfiguration().toMap(); allOptions.keySet().stream() diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index 52181fd6b..c6335e6d6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -40,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.ZoneId; + /** Tests for {@link CanalJsonSerializationSchema}. */ public class CanalJsonSerializationSchemaTest { @@ -53,7 +55,9 @@ public class CanalJsonSerializationSchemaTest { .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); SerializationSchema serializationSchema = ChangeLogJsonFormatFactory.createSerializationSchema( - new Configuration(), JsonSerializationType.CANAL_JSON); + new Configuration(), + JsonSerializationType.CANAL_JSON, + ZoneId.systemDefault()); serializationSchema.open(new MockInitializationContext()); // create table diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index 7a47bf8bd..0be02b9b3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -40,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.ZoneId; + /** Tests for {@link DebeziumJsonSerializationSchema}. */ public class DebeziumJsonSerializationSchemaTest { @@ -53,7 +55,9 @@ public class DebeziumJsonSerializationSchemaTest { .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); SerializationSchema serializationSchema = ChangeLogJsonFormatFactory.createSerializationSchema( - new Configuration(), JsonSerializationType.DEBEZIUM_JSON); + new Configuration(), + JsonSerializationType.DEBEZIUM_JSON, + ZoneId.systemDefault()); serializationSchema.open(new MockInitializationContext()); // create table Schema schema =