diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java index cbe191f4a..3c510812a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java @@ -222,9 +222,17 @@ public class MongoUtils { } if (fullDocPrePostImage) { - // require both pre-image and post-image - changeStream.fullDocument(FullDocument.REQUIRED); - changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); + if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) { + // require both pre-image and post-image records + changeStream.fullDocument(FullDocument.REQUIRED); + changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); + } else { + // for RegEx limited namespaces, use WHEN_AVAILABLE option + // to avoid MongoDB complaining about missing pre- and post-image + // coming from irrelevant collections + changeStream.fullDocument(FullDocument.WHEN_AVAILABLE); + changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE); + } } else if (updateLookup) { changeStream.fullDocument(FullDocument.UPDATE_LOOKUP); }