|
|
|
@ -37,23 +37,35 @@ import java.util.HashMap;
|
|
|
|
|
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
|
|
|
|
|
|
|
|
|
|
private static final long serialVersionUID = 1L;
|
|
|
|
|
private static final JsonConverter CONVERTER = new JsonConverter();
|
|
|
|
|
|
|
|
|
|
private transient JsonConverter jsonConverter;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Configuration whether to enable {@link JsonConverterConfig.SCHEMAS_ENABLE_CONFIG} to include
|
|
|
|
|
* schema in messages.
|
|
|
|
|
*/
|
|
|
|
|
private final Boolean includeSchema;
|
|
|
|
|
|
|
|
|
|
public JsonDebeziumDeserializationSchema() {
|
|
|
|
|
this(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public JsonDebeziumDeserializationSchema(boolean includeSchema) {
|
|
|
|
|
final HashMap<String, Object> configs = new HashMap<>();
|
|
|
|
|
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
|
|
|
|
|
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
|
|
|
|
|
CONVERTER.configure(configs);
|
|
|
|
|
public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
|
|
|
|
|
this.includeSchema = includeSchema;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
|
|
|
|
|
if (jsonConverter == null) {
|
|
|
|
|
// initialize jsonConverter
|
|
|
|
|
jsonConverter = new JsonConverter();
|
|
|
|
|
final HashMap<String, Object> configs = new HashMap<>(2);
|
|
|
|
|
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
|
|
|
|
|
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
|
|
|
|
|
jsonConverter.configure(configs);
|
|
|
|
|
}
|
|
|
|
|
byte[] bytes =
|
|
|
|
|
CONVERTER.fromConnectData(record.topic(), record.valueSchema(), record.value());
|
|
|
|
|
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
|
|
|
|
|
out.collect(new String(bytes));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|