From 15c7247b524c25d1a1b05f453aee8fd84adee870 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Wed, 10 Nov 2021 22:52:50 +0800 Subject: [PATCH] [mongodb] Fix problem of cannot look up fields on non-struct type (#588) --- .../internal/MongoDBConnectorSourceTask.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java index a045ecfd7..da08f15ce 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java @@ -33,7 +33,6 @@ import org.bson.json.JsonReader; import java.lang.reflect.Field; import java.time.Instant; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -101,12 +100,10 @@ public class MongoDBConnectorSourceTask extends SourceTask { @Override public List poll() throws InterruptedException { List sourceRecords = target.poll(); - + List outSourceRecords = new LinkedList<>(); if (isInSnapshotPhase) { // Step1. Snapshot Phase - List outSourceRecords = null; if (sourceRecords != null && !sourceRecords.isEmpty()) { - outSourceRecords = new LinkedList<>(); for (SourceRecord sourceRecord : sourceRecords) { SourceRecord current = markRecordTimestamp(sourceRecord); @@ -136,24 +133,22 @@ public class MongoDBConnectorSourceTask extends SourceTask { // then exit the snapshot phase. if (!isCopying()) { if (currentLastSnapshotRecord != null) { - outSourceRecords = - Collections.singletonList( - markLastSnapshotRecordOfAll(currentLastSnapshotRecord)); + outSourceRecords.add( + markLastSnapshotRecordOfAll(currentLastSnapshotRecord)); currentLastSnapshotRecord = null; } isInSnapshotPhase = false; } } - return outSourceRecords; } else { // Step2. Change Streaming Phase if (sourceRecords != null && !sourceRecords.isEmpty()) { for (SourceRecord current : sourceRecords) { - markRecordTimestamp(current); + outSourceRecords.add(markRecordTimestamp(current)); } } - return sourceRecords; } + return outSourceRecords; } @Override