[mongodb] Add disableCursorTimeout option (#2332)

pull/2343/head
yuxiqian 2 years ago committed by GitHub
parent 883b6846ad
commit a8dd4d33cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -277,6 +277,13 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
</tr>
<tr>
<td>scan.cursor.no-timeout</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>MongoDB 服务端通常会将空闲时间超过 10 分钟的 cursor 关闭,来节省内存开销。将这个参数设置为 true 可以防止 cursor 因为读取时间过长或者背压导致的空闲而关闭。仅在增量快照模式下生效。</td>
</tr>
</tbody>
</table>
</div>

@ -283,6 +283,13 @@ Connector Options
<td>Boolean</td>
<td>Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.</td>
</tr>
<tr>
<td>scan.cursor.no-timeout</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. Only available when parallelism snapshot is enabled.</td>
</tr>
</tbody>
</table>
</div>

@ -206,6 +206,15 @@ public class MongoDBSourceBuilder<T> {
return this;
}
/**
* Whether disable cursor timeout during snapshot phase. Defaults to true. Only enable this when
* MongoDB server doesn't support noCursorTimeout option.
*/
public MongoDBSourceBuilder<T> disableCursorTimeout(boolean disableCursorTimeout) {
this.configFactory.disableCursorTimeout(disableCursorTimeout);
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.

@ -50,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private final int splitSizeMB;
private final boolean closeIdleReaders;
private final boolean enableFullDocPrePostImage;
private final boolean disableCursorTimeout;
MongoDBSourceConfig(
String scheme,
@ -68,7 +69,8 @@ public class MongoDBSourceConfig implements SourceConfig {
int splitMetaGroupSize,
int splitSizeMB,
boolean closeIdleReaders,
boolean enableFullDocPrePostImage) {
boolean enableFullDocPrePostImage,
boolean disableCursorTimeout) {
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
@ -87,6 +89,7 @@ public class MongoDBSourceConfig implements SourceConfig {
this.splitSizeMB = splitSizeMB;
this.closeIdleReaders = closeIdleReaders;
this.enableFullDocPrePostImage = enableFullDocPrePostImage;
this.disableCursorTimeout = disableCursorTimeout;
}
public String getScheme() {
@ -170,6 +173,10 @@ public class MongoDBSourceConfig implements SourceConfig {
return enableFullDocPrePostImage;
}
public boolean disableCursorTimeout() {
return disableCursorTimeout;
}
@Override
public boolean equals(Object o) {
if (this == o) {

@ -60,6 +60,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
private boolean closeIdleReaders = false;
private boolean enableFullDocPrePostImage = false;
private boolean disableCursorTimeout = true;
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceConfigFactory scheme(String scheme) {
@ -231,6 +232,15 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
return this;
}
/**
* whether pass <code>noCursorTimeout</code> config when creating MongoDB cursor. Defaults to
* true.
*/
public MongoDBSourceConfigFactory disableCursorTimeout(boolean disableCursorTimeout) {
this.disableCursorTimeout = disableCursorTimeout;
return this;
}
/** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public MongoDBSourceConfig create(int subtaskId) {
@ -252,6 +262,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
splitMetaGroupSize,
splitSizeMB,
closeIdleReaders,
enableFullDocPrePostImage);
enableFullDocPrePostImage,
disableCursorTimeout);
}
}

@ -141,4 +141,11 @@ public class MongoDBSourceOptions {
.defaultValue(false)
.withDescription(
"Scan full mode changelog. Only available when MongoDB >= 6.0. Defaults to false.");
public static final ConfigOption<Boolean> SCAN_NO_CURSOR_TIMEOUT =
ConfigOptions.key("scan.cursor.no-timeout")
.booleanType()
.defaultValue(true)
.withDescription(
"MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that.");
}

@ -113,7 +113,7 @@ public class MongoDBScanFetchTask implements FetchTask<SourceSplitBase> {
.max((BsonDocument) snapshotSplit.getSplitEnd()[1])
.hint((BsonDocument) snapshotSplit.getSplitStart()[0])
.batchSize(sourceConfig.getBatchSize())
.noCursorTimeout(true)
.noCursorTimeout(sourceConfig.disableCursorTimeout())
.cursor();
BsonDocument keyDocument, valueDocument;

@ -81,6 +81,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final Integer splitSizeMB;
private final boolean closeIdlerReaders;
private final boolean enableFullDocPrePostImage;
private final boolean noCursorTimeout;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -112,7 +113,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
@Nullable Integer splitMetaGroupSize,
@Nullable Integer splitSizeMB,
boolean closeIdlerReaders,
boolean enableFullDocPrePostImage) {
boolean enableFullDocPrePostImage,
boolean noCursorTimeout) {
this.physicalSchema = physicalSchema;
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
@ -135,6 +137,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.splitSizeMB = splitSizeMB;
this.closeIdlerReaders = closeIdlerReaders;
this.enableFullDocPrePostImage = enableFullDocPrePostImage;
this.noCursorTimeout = noCursorTimeout;
}
@Override
@ -191,7 +194,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
.closeIdleReaders(closeIdlerReaders)
.scanFullChangelog(enableFullDocPrePostImage)
.startupOptions(startupOptions)
.deserializer(deserializer);
.deserializer(deserializer)
.disableCursorTimeout(noCursorTimeout);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
@ -286,7 +290,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
splitMetaGroupSize,
splitSizeMB,
closeIdlerReaders,
enableFullDocPrePostImage);
enableFullDocPrePostImage,
noCursorTimeout);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -321,7 +326,9 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(splitSizeMB, that.splitSizeMB)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(closeIdlerReaders, that.closeIdlerReaders);
&& Objects.equals(closeIdlerReaders, that.closeIdlerReaders)
&& Objects.equals(enableFullDocPrePostImage, that.enableFullDocPrePostImage)
&& Objects.equals(noCursorTimeout, that.noCursorTimeout);
}
@Override
@ -347,7 +354,9 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
splitSizeMB,
producedDataType,
metadataKeys,
closeIdlerReaders);
closeIdlerReaders,
enableFullDocPrePostImage,
noCursorTimeout);
}
@Override

@ -51,6 +51,7 @@ import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOp
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
@ -107,6 +108,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
boolean enableFullDocumentPrePostImage =
config.getOptional(FULL_DOCUMENT_PRE_POST_IMAGE).orElse(false);
boolean noCursorTimeout = config.getOptional(SCAN_NO_CURSOR_TIMEOUT).orElse(true);
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
@ -134,7 +136,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
splitMetaGroupSize,
splitSizeMB,
enableCloseIdleReaders,
enableFullDocumentPrePostImage);
enableFullDocumentPrePostImage,
noCursorTimeout);
}
private void checkPrimaryKey(UniqueConstraint pk, String message) {
@ -207,6 +210,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(CHUNK_META_GROUP_SIZE);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(FULL_DOCUMENT_PRE_POST_IMAGE);
options.add(SCAN_NO_CURSOR_TIMEOUT);
return options;
}
}

@ -55,6 +55,7 @@ import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOp
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
@ -110,6 +111,9 @@ public class MongoDBTableFactoryTest {
private static final boolean FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT =
FULL_DOCUMENT_PRE_POST_IMAGE.defaultValue();
private static final boolean SCAN_NO_CURSOR_TIMEOUT_DEFAULT =
SCAN_NO_CURSOR_TIMEOUT.defaultValue();
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
@ -137,7 +141,8 @@ public class MongoDBTableFactoryTest {
CHUNK_META_GROUP_SIZE_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT);
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT,
SCAN_NO_CURSOR_TIMEOUT_DEFAULT);
assertEquals(expectedSource, actualSource);
}
@ -158,6 +163,7 @@ public class MongoDBTableFactoryTest {
options.put("scan.incremental.snapshot.chunk.size.mb", "10");
options.put("scan.incremental.close-idle-reader.enabled", "true");
options.put("scan.full-changelog", "true");
options.put("scan.cursor.no-timeout", "false");
DynamicTableSource actualSource = createTableSource(SCHEMA, options);
MongoDBTableSource expectedSource =
@ -181,7 +187,8 @@ public class MongoDBTableFactoryTest {
1001,
10,
true,
true);
true,
false);
assertEquals(expectedSource, actualSource);
}
@ -218,7 +225,8 @@ public class MongoDBTableFactoryTest {
CHUNK_META_GROUP_SIZE_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT);
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT,
SCAN_NO_CURSOR_TIMEOUT_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

Loading…
Cancel
Save