From 5ec35a840f24b4a33f1a0a5b8de1922051add70c Mon Sep 17 00:00:00 2001 From: mincwang <33626973+mincwang@users.noreply.github.com> Date: Wed, 3 Nov 2021 13:50:05 +0800 Subject: [PATCH] [common] Fix NPE of JsonConverter when job run on cluster(#470) --- .../JsonDebeziumDeserializationSchema.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java index de74b8caa..a677922b3 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java @@ -37,23 +37,35 @@ import java.util.HashMap; public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema { private static final long serialVersionUID = 1L; - private static final JsonConverter CONVERTER = new JsonConverter(); + + private transient JsonConverter jsonConverter; + + /** + * Configuration whether to enable {@link JsonConverterConfig.SCHEMAS_ENABLE_CONFIG} to include + * schema in messages. + */ + private final Boolean includeSchema; public JsonDebeziumDeserializationSchema() { this(false); } - public JsonDebeziumDeserializationSchema(boolean includeSchema) { - final HashMap configs = new HashMap<>(); - configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); - configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema); - CONVERTER.configure(configs); + public JsonDebeziumDeserializationSchema(Boolean includeSchema) { + this.includeSchema = includeSchema; } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { + if (jsonConverter == null) { + // initialize jsonConverter + jsonConverter = new JsonConverter(); + final HashMap configs = new HashMap<>(2); + configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); + configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema); + jsonConverter.configure(configs); + } byte[] bytes = - CONVERTER.fromConnectData(record.topic(), record.valueSchema(), record.value()); + jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); out.collect(new String(bytes)); }