From 5a2a8b92613dc7181ca94294ac7f28191e3b2fea Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Tue, 9 Nov 2021 14:02:17 +0800 Subject: [PATCH] [hotfix][mongodb] Fix NullPointException when heartbeat option is enabled (#586) --- .../internal/MongoDBConnectorSourceTask.java | 45 ++++++++++++++++--- .../mongodb/internal/MongoDBEnvelope.java | 4 ++ .../mongodb/table/MongoDBConnectorITCase.java | 3 +- 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java index 147f691f7..a045ecfd7 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java @@ -23,6 +23,8 @@ import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SnapshotRecord; import io.debezium.data.Envelope; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -30,6 +32,7 @@ import org.apache.kafka.connect.source.SourceTaskContext; import org.bson.json.JsonReader; import java.lang.reflect.Field; +import java.time.Instant; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -42,10 +45,13 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class MongoDBConnectorSourceTask extends SourceTask { - private static final String COPY_KEY = "copy"; - private static final String TRUE = "true"; + private static final Schema HEARTBEAT_VALUE_SCHEMA = + SchemaBuilder.struct() + .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA) + .build(); + private final MongoSourceTask target; private final Field isCopyingField; @@ -101,8 +107,9 @@ public class MongoDBConnectorSourceTask extends SourceTask { List outSourceRecords = null; if (sourceRecords != null && !sourceRecords.isEmpty()) { outSourceRecords = new LinkedList<>(); - for (SourceRecord current : sourceRecords) { - markRecordTimestamp(current); + for (SourceRecord sourceRecord : sourceRecords) { + SourceRecord current = markRecordTimestamp(sourceRecord); + if (isSnapshotRecord(current)) { markSnapshotRecord(current); if (currentLastSnapshotRecord != null) { @@ -154,7 +161,14 @@ public class MongoDBConnectorSourceTask extends SourceTask { target.stop(); } - private void markRecordTimestamp(SourceRecord record) { + private SourceRecord markRecordTimestamp(SourceRecord record) { + if (isHeartbeatRecord(record)) { + return markTimestampForHeartbeatRecord(record); + } + return markTimestampForDataRecord(record); + } + + private SourceRecord markTimestampForDataRecord(SourceRecord record) { final Struct value = (Struct) record.value(); final Struct source = new Struct(value.schema().field(Envelope.FieldName.SOURCE).schema()); // It indicates the time that the change was made in the database. If the record is read @@ -168,6 +182,21 @@ public class MongoDBConnectorSourceTask extends SourceTask { } source.put(AbstractSourceInfo.TIMESTAMP_KEY, timestamp); value.put(Envelope.FieldName.SOURCE, source); + return record; + } + + private SourceRecord markTimestampForHeartbeatRecord(SourceRecord record) { + final Struct heartbeatValue = new Struct(HEARTBEAT_VALUE_SCHEMA); + heartbeatValue.put(AbstractSourceInfo.TIMESTAMP_KEY, Instant.now().toEpochMilli()); + + return new SourceRecord( + record.sourcePartition(), + record.sourceOffset(), + record.topic(), + record.keySchema(), + record.key(), + HEARTBEAT_VALUE_SCHEMA, + heartbeatValue); } private void markSnapshotRecord(SourceRecord record) { @@ -187,7 +216,11 @@ public class MongoDBConnectorSourceTask extends SourceTask { } private boolean isSnapshotRecord(SourceRecord sourceRecord) { - return TRUE.equals(sourceRecord.sourceOffset().get(COPY_KEY)); + return TRUE.equals(sourceRecord.sourceOffset().get(MongoDBEnvelope.COPY_KEY_FIELD)); + } + + private boolean isHeartbeatRecord(SourceRecord sourceRecord) { + return TRUE.equals(sourceRecord.sourceOffset().get(MongoDBEnvelope.HEARTBEAT_KEY_FIELD)); } private boolean isCopying() { diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java index 0e9aa99e0..dac95701c 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java @@ -37,4 +37,8 @@ public class MongoDBEnvelope { public static final String NAMESPACE_DATABASE_FIELD = "db"; public static final String NAMESPACE_COLLECTION_FIELD = "coll"; + + public static final String COPY_KEY_FIELD = "copy"; + + public static final String HEARTBEAT_KEY_FIELD = "HEARTBEAT"; } diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index 36f202e36..faf85f1b0 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -86,7 +86,8 @@ public class MongoDBConnectorITCase extends MongoDBTestBase { + " 'username' = '%s'," + " 'password' = '%s'," + " 'database' = '%s'," - + " 'collection' = '%s'" + + " 'collection' = '%s'," + + " 'heartbeat.interval.ms' = '1000'" + ")", MONGODB_CONTAINER.getHostAndPort(), FLINK_USER,