[mongodb] Fix problem of cannot look up fields on non-struct type (#588)

pull/598/head
Jiabao Sun 3 years ago committed by GitHub
parent 86c9c2a191
commit 15c7247b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -33,7 +33,6 @@ import org.bson.json.JsonReader;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.time.Instant; import java.time.Instant;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -101,12 +100,10 @@ public class MongoDBConnectorSourceTask extends SourceTask {
@Override @Override
public List<SourceRecord> poll() throws InterruptedException { public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> sourceRecords = target.poll(); List<SourceRecord> sourceRecords = target.poll();
List<SourceRecord> outSourceRecords = new LinkedList<>();
if (isInSnapshotPhase) { if (isInSnapshotPhase) {
// Step1. Snapshot Phase // Step1. Snapshot Phase
List<SourceRecord> outSourceRecords = null;
if (sourceRecords != null && !sourceRecords.isEmpty()) { if (sourceRecords != null && !sourceRecords.isEmpty()) {
outSourceRecords = new LinkedList<>();
for (SourceRecord sourceRecord : sourceRecords) { for (SourceRecord sourceRecord : sourceRecords) {
SourceRecord current = markRecordTimestamp(sourceRecord); SourceRecord current = markRecordTimestamp(sourceRecord);
@ -136,24 +133,22 @@ public class MongoDBConnectorSourceTask extends SourceTask {
// then exit the snapshot phase. // then exit the snapshot phase.
if (!isCopying()) { if (!isCopying()) {
if (currentLastSnapshotRecord != null) { if (currentLastSnapshotRecord != null) {
outSourceRecords = outSourceRecords.add(
Collections.singletonList(
markLastSnapshotRecordOfAll(currentLastSnapshotRecord)); markLastSnapshotRecordOfAll(currentLastSnapshotRecord));
currentLastSnapshotRecord = null; currentLastSnapshotRecord = null;
} }
isInSnapshotPhase = false; isInSnapshotPhase = false;
} }
} }
return outSourceRecords;
} else { } else {
// Step2. Change Streaming Phase // Step2. Change Streaming Phase
if (sourceRecords != null && !sourceRecords.isEmpty()) { if (sourceRecords != null && !sourceRecords.isEmpty()) {
for (SourceRecord current : sourceRecords) { for (SourceRecord current : sourceRecords) {
markRecordTimestamp(current); outSourceRecords.add(markRecordTimestamp(current));
} }
} }
return sourceRecords;
} }
return outSourceRecords;
} }
@Override @Override

Loading…
Cancel
Save