[mongodb] Allow mongo ARRAY to be converted to string type in Flink (#1475)

pull/1478/head
vanliu 3 years ago committed by GitHub
parent 1b17a751cc
commit ed980ef499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -52,9 +52,14 @@ import org.bson.BsonRegularExpression;
import org.bson.BsonTimestamp; import org.bson.BsonTimestamp;
import org.bson.BsonUndefined; import org.bson.BsonUndefined;
import org.bson.BsonValue; import org.bson.BsonValue;
import org.bson.codecs.BsonArrayCodec;
import org.bson.codecs.EncoderContext;
import org.bson.json.JsonWriter;
import org.bson.types.Decimal128; import org.bson.types.Decimal128;
import java.io.Serializable; import java.io.Serializable;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
@ -600,6 +605,28 @@ public class MongoDBConnectorDeserializationSchema
return StringData.fromString( return StringData.fromString(
convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME)); convertInstantToZonedDateTime(instant).format(ISO_OFFSET_DATE_TIME));
} }
if (docObj.isArray()) {
// convert bson array to json string
Writer writer = new StringWriter();
JsonWriter jsonArrayWriter =
new JsonWriter(writer) {
@Override
public void writeStartArray() {
doWriteStartArray();
setState(State.VALUE);
}
@Override
public void writeEndArray() {
doWriteEndArray();
setState(getNextState());
}
};
new BsonArrayCodec()
.encode(jsonArrayWriter, docObj.asArray(), EncoderContext.builder().build());
return StringData.fromString(writer.toString());
}
if (docObj.isRegularExpression()) { if (docObj.isRegularExpression()) {
BsonRegularExpression regex = docObj.asRegularExpression(); BsonRegularExpression regex = docObj.asRegularExpression();
return StringData.fromString( return StringData.fromString(

Loading…
Cancel
Save