From ed980ef499160b8d37e1f411568884c2b2c6c2a5 Mon Sep 17 00:00:00 2001 From: vanliu Date: Fri, 12 Aug 2022 11:29:08 +0800 Subject: [PATCH] [mongodb] Allow mongo ARRAY to be converted to string type in Flink (#1475) --- ...MongoDBConnectorDeserializationSchema.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java index 4da0aa733..226af61c8 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java @@ -52,9 +52,14 @@ import org.bson.BsonRegularExpression; import org.bson.BsonTimestamp; import org.bson.BsonUndefined; import org.bson.BsonValue; +import org.bson.codecs.BsonArrayCodec; +import org.bson.codecs.EncoderContext; +import org.bson.json.JsonWriter; import org.bson.types.Decimal128; import java.io.Serializable; +import java.io.StringWriter; +import java.io.Writer; import java.lang.reflect.Array; import java.math.BigDecimal; import java.time.Instant; @@ -600,6 +605,28 @@ public class MongoDBConnectorDeserializationSchema return StringData.fromString( convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME)); } + if (docObj.isArray()) { + // convert bson array to json string + Writer writer = new StringWriter(); + JsonWriter jsonArrayWriter = + new JsonWriter(writer) { + @Override + public void writeStartArray() { + doWriteStartArray(); + setState(State.VALUE); + } + + @Override + public void writeEndArray() { + doWriteEndArray(); + setState(getNextState()); + } + }; + + new BsonArrayCodec() + .encode(jsonArrayWriter, docObj.asArray(), EncoderContext.builder().build()); + return StringData.fromString(writer.toString()); + } if (docObj.isRegularExpression()) { BsonRegularExpression regex = docObj.asRegularExpression(); return StringData.fromString(