[hotfix][mongodb] Fix NullPointException when heartbeat option is enabled (#586)

pull/588/head
Jiabao Sun 3 years ago committed by GitHub
parent 39944c9ecd
commit 5a2a8b9261
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,6 +23,8 @@ import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SnapshotRecord;
import io.debezium.data.Envelope;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@ -30,6 +32,7 @@ import org.apache.kafka.connect.source.SourceTaskContext;
import org.bson.json.JsonReader;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@ -42,10 +45,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class MongoDBConnectorSourceTask extends SourceTask {
private static final String COPY_KEY = "copy";
private static final String TRUE = "true";
private static final Schema HEARTBEAT_VALUE_SCHEMA =
SchemaBuilder.struct()
.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.build();
private final MongoSourceTask target;
private final Field isCopyingField;
@ -101,8 +107,9 @@ public class MongoDBConnectorSourceTask extends SourceTask {
List<SourceRecord> outSourceRecords = null;
if (sourceRecords != null && !sourceRecords.isEmpty()) {
outSourceRecords = new LinkedList<>();
for (SourceRecord current : sourceRecords) {
markRecordTimestamp(current);
for (SourceRecord sourceRecord : sourceRecords) {
SourceRecord current = markRecordTimestamp(sourceRecord);
if (isSnapshotRecord(current)) {
markSnapshotRecord(current);
if (currentLastSnapshotRecord != null) {
@ -154,7 +161,14 @@ public class MongoDBConnectorSourceTask extends SourceTask {
target.stop();
}
private void markRecordTimestamp(SourceRecord record) {
private SourceRecord markRecordTimestamp(SourceRecord record) {
if (isHeartbeatRecord(record)) {
return markTimestampForHeartbeatRecord(record);
}
return markTimestampForDataRecord(record);
}
private SourceRecord markTimestampForDataRecord(SourceRecord record) {
final Struct value = (Struct) record.value();
final Struct source = new Struct(value.schema().field(Envelope.FieldName.SOURCE).schema());
// It indicates the time that the change was made in the database. If the record is read
@ -168,6 +182,21 @@ public class MongoDBConnectorSourceTask extends SourceTask {
}
source.put(AbstractSourceInfo.TIMESTAMP_KEY, timestamp);
value.put(Envelope.FieldName.SOURCE, source);
return record;
}
private SourceRecord markTimestampForHeartbeatRecord(SourceRecord record) {
final Struct heartbeatValue = new Struct(HEARTBEAT_VALUE_SCHEMA);
heartbeatValue.put(AbstractSourceInfo.TIMESTAMP_KEY, Instant.now().toEpochMilli());
return new SourceRecord(
record.sourcePartition(),
record.sourceOffset(),
record.topic(),
record.keySchema(),
record.key(),
HEARTBEAT_VALUE_SCHEMA,
heartbeatValue);
}
private void markSnapshotRecord(SourceRecord record) {
@ -187,7 +216,11 @@ public class MongoDBConnectorSourceTask extends SourceTask {
}
private boolean isSnapshotRecord(SourceRecord sourceRecord) {
return TRUE.equals(sourceRecord.sourceOffset().get(COPY_KEY));
return TRUE.equals(sourceRecord.sourceOffset().get(MongoDBEnvelope.COPY_KEY_FIELD));
}
private boolean isHeartbeatRecord(SourceRecord sourceRecord) {
return TRUE.equals(sourceRecord.sourceOffset().get(MongoDBEnvelope.HEARTBEAT_KEY_FIELD));
}
private boolean isCopying() {

@ -37,4 +37,8 @@ public class MongoDBEnvelope {
public static final String NAMESPACE_DATABASE_FIELD = "db";
public static final String NAMESPACE_COLLECTION_FIELD = "coll";
public static final String COPY_KEY_FIELD = "copy";
public static final String HEARTBEAT_KEY_FIELD = "HEARTBEAT";
}

@ -86,7 +86,8 @@ public class MongoDBConnectorITCase extends MongoDBTestBase {
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ " 'collection' = '%s',"
+ " 'heartbeat.interval.ms' = '1000'"
+ ")",
MONGODB_CONTAINER.getHostAndPort(),
FLINK_USER,

Loading…
Cancel
Save