[FLINK-35430][cdc-connector][kafka] Pass the time zone info to JsonSerializationSchema

This closes #3377.
pull/3386/head
joyCurry30 10 months ago committed by GitHub
parent 84ae0dc453
commit 19f7afe6af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -27,6 +27,8 @@ import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonFormatOptionsUtil;
import java.time.ZoneId;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
@ -46,7 +48,7 @@ public class ChangeLogJsonFormatFactory {
* @return The configured instance of {@link SerializationSchema}.
*/
public static SerializationSchema<Event> createSerializationSchema(
ReadableConfig formatOptions, JsonSerializationType type) {
ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) {
TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
@ -62,6 +64,7 @@ public class ChangeLogJsonFormatFactory {
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber);
}
case CANAL_JSON:
@ -70,6 +73,7 @@ public class ChangeLogJsonFormatFactory {
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber);
}
default:

@ -81,12 +81,13 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = ZoneId.systemDefault();
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
}

@ -80,12 +80,13 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = ZoneId.systemDefault();
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
}

@ -62,7 +62,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
SerializationSchema<Event> valueSerialization =
ChangeLogJsonFormatFactory.createSerializationSchema(
configuration, jsonSerializationType);
configuration, jsonSerializationType, zoneId);
final Properties kafkaProperties = new Properties();
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
allOptions.keySet().stream()

@ -40,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.ZoneId;
/** Tests for {@link CanalJsonSerializationSchema}. */
public class CanalJsonSerializationSchemaTest {
@ -53,7 +55,9 @@ public class CanalJsonSerializationSchemaTest {
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
SerializationSchema<Event> serializationSchema =
ChangeLogJsonFormatFactory.createSerializationSchema(
new Configuration(), JsonSerializationType.CANAL_JSON);
new Configuration(),
JsonSerializationType.CANAL_JSON,
ZoneId.systemDefault());
serializationSchema.open(new MockInitializationContext());
// create table

@ -40,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.ZoneId;
/** Tests for {@link DebeziumJsonSerializationSchema}. */
public class DebeziumJsonSerializationSchemaTest {
@ -53,7 +55,9 @@ public class DebeziumJsonSerializationSchemaTest {
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
SerializationSchema<Event> serializationSchema =
ChangeLogJsonFormatFactory.createSerializationSchema(
new Configuration(), JsonSerializationType.DEBEZIUM_JSON);
new Configuration(),
JsonSerializationType.DEBEZIUM_JSON,
ZoneId.systemDefault());
serializationSchema.open(new MockInitializationContext());
// create table
Schema schema =

Loading…
Cancel
Save