[cdc-base] Close idle readers when snapshot finished (#2202)

* [cdc-base] Close idle readers when snapshot finished
pull/2208/head
Jiabao Sun 2 years ago committed by GitHub
parent d9ea355752
commit afefe40944
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -33,6 +33,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
protected final double distributionFactorUpper;
protected final double distributionFactorLower;
protected final boolean includeSchemaChanges;
protected final boolean closeIdleReaders;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -47,6 +48,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean closeIdleReaders,
Properties dbzProperties,
Configuration dbzConfiguration) {
this.startupOptions = startupOptions;
@ -55,6 +57,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.closeIdleReaders = closeIdleReaders;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
}
@ -87,6 +90,11 @@ public abstract class BaseSourceConfig implements SourceConfig {
return includeSchemaChanges;
}
@Override
public boolean isCloseIdleReaders() {
return closeIdleReaders;
}
public Properties getDbzProperties() {
return dbzProperties;
}

@ -53,6 +53,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean closeIdleReaders,
Properties dbzProperties,
Configuration dbzConfiguration,
String driverClassName,
@ -73,6 +74,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
dbzProperties,
dbzConfiguration);
this.driverClassName = driverClassName;

@ -42,6 +42,7 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
protected List<String> tableList;
protected StartupOptions startupOptions = StartupOptions.initial();
protected boolean includeSchemaChanges = false;
protected boolean closeIdleReaders = false;
protected double distributionFactorUpper =
SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
protected double distributionFactorLower =
@ -209,6 +210,21 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public JdbcSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) {
this.closeIdleReaders = closeIdleReaders;
return this;
}
@Override
public abstract JdbcSourceConfig create(int subtask);
}

@ -34,6 +34,8 @@ public interface SourceConfig extends Serializable {
boolean isIncludeSchemaChanges();
boolean isCloseIdleReaders();
/** Factory for the {@code SourceConfig}. */
@FunctionalInterface
interface Factory<C extends SourceConfig> extends Serializable {

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.base.options;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@ -111,4 +112,14 @@ public class SourceOptions {
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to close idle readers at the end of the snapshot phase. This feature depends on "
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");
}

@ -170,6 +170,11 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
}
@Override
public boolean isStreamSplitAssigned() {
return isStreamSplitAssigned;
}
@Override
public void close() {
snapshotSplitAssigner.close();

@ -55,6 +55,11 @@ public interface SplitAssigner {
*/
boolean waitingForFinishedSplits();
/** Whether the split assigner is finished stream split assigning. */
default boolean isStreamSplitAssigned() {
throw new UnsupportedOperationException("Not support to assigning StreamSplit.");
}
/**
* Gets the finished splits' information. This is useful metadata to generate a stream split
* that considering finished snapshot splits.

@ -117,6 +117,11 @@ public class StreamSplitAssigner implements SplitAssigner {
// nothing to do
}
@Override
public boolean isStreamSplitAssigned() {
return isStreamSplitAssigned;
}
@Override
public void close() {}

@ -163,6 +163,14 @@ public class IncrementalSourceEnumerator
continue;
}
if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) {
// close idle readers when snapshot phase finished.
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
LOG.info("Close idle reader of subtask {}", nextAwaiting);
continue;
}
Optional<SourceSplitBase> split = splitAssigner.getNext();
if (split.isPresent()) {
final SourceSplitBase sourceSplit = split.get();

@ -0,0 +1,43 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.utils;
import org.apache.flink.runtime.util.EnvironmentInformation;
/** Utilities for environment information at runtime. */
public class EnvironmentUtils {
private EnvironmentUtils() {}
private static final VersionComparable FLINK_1_14 = VersionComparable.fromVersionString("1.14");
public static VersionComparable runtimeFlinkVersion() {
return VersionComparable.fromVersionString(EnvironmentInformation.getVersion());
}
public static boolean supportCheckpointsAfterTasksFinished() {
return runtimeFlinkVersion().newerThanOrEqualTo(FLINK_1_14);
}
public static void checkSupportCheckpointsAfterTasksFinished(boolean closeIdleReaders) {
if (closeIdleReaders && !supportCheckpointsAfterTasksFinished()) {
throw new UnsupportedOperationException(
"The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. But the current version is "
+ runtimeFlinkVersion());
}
}
}

@ -0,0 +1,107 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.utils;
/** Used to compare version numbers at runtime. */
public class VersionComparable implements Comparable<VersionComparable> {
private int majorVersion;
private int minorVersion;
private int patchVersion;
private String versionString;
private VersionComparable(String versionString) {
this.versionString = versionString;
try {
int pos = versionString.indexOf('-');
String numberPart = versionString;
if (pos > 0) {
numberPart = versionString.substring(0, pos);
}
String[] versions = numberPart.split("\\.");
this.majorVersion = Integer.parseInt(versions[0]);
this.minorVersion = Integer.parseInt(versions[1]);
if (versions.length == 3) {
this.patchVersion = Integer.parseInt(versions[2]);
}
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Can not recognize version %s.", versionString));
}
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
public int getPatchVersion() {
return patchVersion;
}
public static VersionComparable fromVersionString(String versionString) {
return new VersionComparable(versionString);
}
@Override
public int compareTo(VersionComparable version) {
if (equalTo(version)) {
return 0;
} else if (newerThan(version)) {
return 1;
} else {
return -1;
}
}
public boolean equalTo(VersionComparable version) {
return majorVersion == version.majorVersion
&& minorVersion == version.minorVersion
&& patchVersion == version.patchVersion;
}
public boolean newerThan(VersionComparable version) {
if (majorVersion <= version.majorVersion) {
if (majorVersion < version.majorVersion) {
return false;
} else {
if (minorVersion <= version.minorVersion) {
if (minorVersion < version.patchVersion) {
return false;
} else {
return patchVersion > version.patchVersion;
}
}
}
}
return true;
}
public boolean newerThanOrEqualTo(VersionComparable version) {
return newerThan(version) || equalTo(version);
}
@Override
public String toString() {
return versionString;
}
}

@ -205,6 +205,21 @@ public class MySqlSourceBuilder<T> {
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public MySqlSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.

@ -45,6 +45,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean closeIdleReaders,
Properties dbzProperties,
Configuration dbzConfiguration,
String driverClassName,
@ -66,6 +67,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
dbzProperties,
dbzConfiguration,
driverClassName,

@ -24,6 +24,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig;
import java.util.Properties;
import java.util.UUID;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A factory to initialize {@link MySqlSourceConfig}. */
@ -47,6 +48,7 @@ public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig create(int subtaskId) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
Properties props = new Properties();
// hard code server name, because we don't need to distinguish it, docs:
// Logical name that identifies and provides a namespace for the particular
@ -113,6 +115,7 @@ public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
props,
dbzConfiguration,
driverClassName,

@ -46,6 +46,7 @@ import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT;
@ -141,6 +142,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
List<String> sqlLines =
Arrays.asList(
"SET 'execution.checkpointing.interval' = '3s';",
"SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';",
"CREATE TABLE products_source (",
" _id STRING NOT NULL,",
" name STRING,",
@ -156,7 +158,10 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
" 'password' = '" + FLINK_USER_PASSWORD + "',",
" 'collection' = 'products',",
" 'heartbeat.interval.ms' = '1000',",
" 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "'",
" 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "',",
" 'scan.incremental.close-idle-reader.enabled' = '"
+ supportCheckpointsAfterTasksFinished()
+ "'",
");",
"CREATE TABLE mongodb_products_sink (",
" `id` STRING NOT NULL,",

@ -32,6 +32,8 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished;
/** End-to-end tests for mysql-cdc connector uber jar. */
public class MySqlE2eITCase extends FlinkContainerTestEnvironment {
@ -44,6 +46,7 @@ public class MySqlE2eITCase extends FlinkContainerTestEnvironment {
List<String> sqlLines =
Arrays.asList(
"SET 'execution.checkpointing.interval' = '3s';",
"SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';",
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
@ -63,7 +66,10 @@ public class MySqlE2eITCase extends FlinkContainerTestEnvironment {
" 'table-name' = 'products_source',",
" 'server-time-zone' = 'UTC',",
" 'server-id' = '5800-5900',",
" 'scan.incremental.snapshot.chunk.size' = '4'",
" 'scan.incremental.snapshot.chunk.size' = '4',",
" 'scan.incremental.close-idle-reader.enabled' = '"
+ supportCheckpointsAfterTasksFinished()
+ "'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",

@ -183,6 +183,23 @@ public class MongoDBSourceBuilder<T> {
return this;
}
/**
* scan.incremental.close-idle-reader.enabled
*
* <p>Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public MongoDBSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.
@ -198,7 +215,6 @@ public class MongoDBSourceBuilder<T> {
* @return a MongoDBParallelSource with the settings made for this builder.
*/
public MongoDBSource<T> build() {
configFactory.validate();
return new MongoDBSource<>(configFactory, checkNotNull(deserializer));
}
}

@ -48,6 +48,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private final int heartbeatIntervalMillis;
private final int splitMetaGroupSize;
private final int splitSizeMB;
private final boolean closeIdleReaders;
MongoDBSourceConfig(
String scheme,
@ -64,7 +65,8 @@ public class MongoDBSourceConfig implements SourceConfig {
StartupOptions startupOptions,
int heartbeatIntervalMillis,
int splitMetaGroupSize,
int splitSizeMB) {
int splitSizeMB,
boolean closeIdleReaders) {
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
@ -81,6 +83,7 @@ public class MongoDBSourceConfig implements SourceConfig {
this.heartbeatIntervalMillis = heartbeatIntervalMillis;
this.splitMetaGroupSize = splitMetaGroupSize;
this.splitSizeMB = splitSizeMB;
this.closeIdleReaders = closeIdleReaders;
}
public String getScheme() {
@ -155,6 +158,11 @@ public class MongoDBSourceConfig implements SourceConfig {
return false;
}
@Override
public boolean isCloseIdleReaders() {
return closeIdleReaders;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -172,6 +180,7 @@ public class MongoDBSourceConfig implements SourceConfig {
&& heartbeatIntervalMillis == that.heartbeatIntervalMillis
&& splitMetaGroupSize == that.splitMetaGroupSize
&& splitSizeMB == that.splitSizeMB
&& closeIdleReaders == that.closeIdleReaders
&& Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
@ -198,6 +207,7 @@ public class MongoDBSourceConfig implements SourceConfig {
startupOptions,
heartbeatIntervalMillis,
splitMetaGroupSize,
splitSizeMB);
splitSizeMB,
closeIdleReaders);
}
}

@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.List;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
@ -52,11 +53,12 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
private Integer batchSize = BATCH_SIZE.defaultValue();
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
private Boolean updateLookup = true;
private boolean updateLookup = true;
private StartupOptions startupOptions = StartupOptions.initial();
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
private boolean closeIdleReaders = false;
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceConfigFactory scheme(String scheme) {
@ -202,14 +204,25 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
return this;
}
/** Validate required options. */
public void validate() {
checkNotNull(hosts, "hosts must be provided");
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public MongoDBSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) {
this.closeIdleReaders = closeIdleReaders;
return this;
}
/** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public MongoDBSourceConfig create(int subtaskId) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
return new MongoDBSourceConfig(
scheme,
hosts,
@ -225,6 +238,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
startupOptions,
heartbeatIntervalMillis,
splitMetaGroupSize,
splitSizeMB);
splitSizeMB,
closeIdleReaders);
}
}

@ -79,6 +79,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final boolean enableParallelRead;
private final Integer splitMetaGroupSize;
private final Integer splitSizeMB;
private final boolean closeIdlerReaders;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -108,7 +109,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
ZoneId localTimeZone,
boolean enableParallelRead,
@Nullable Integer splitMetaGroupSize,
@Nullable Integer splitSizeMB) {
@Nullable Integer splitSizeMB,
boolean closeIdlerReaders) {
this.physicalSchema = physicalSchema;
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
@ -129,6 +131,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.enableParallelRead = enableParallelRead;
this.splitMetaGroupSize = splitMetaGroupSize;
this.splitSizeMB = splitSizeMB;
this.closeIdlerReaders = closeIdlerReaders;
}
@Override
@ -178,6 +181,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.closeIdleReaders(closeIdlerReaders)
.deserializer(deserializer);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
@ -271,7 +275,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
localTimeZone,
enableParallelRead,
splitMetaGroupSize,
splitSizeMB);
splitSizeMB,
closeIdlerReaders);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -305,7 +310,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(splitMetaGroupSize, that.splitMetaGroupSize)
&& Objects.equals(splitSizeMB, that.splitSizeMB)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(closeIdlerReaders, that.closeIdlerReaders);
}
@Override
@ -330,7 +336,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
splitMetaGroupSize,
splitSizeMB,
producedDataType,
metadataKeys);
metadataKeys,
closeIdlerReaders);
}
@Override

@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS;
@ -92,6 +93,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
: ZoneId.of(zoneId);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
@ -121,7 +123,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
localTimeZone,
enableParallelRead,
splitMetaGroupSize,
splitSizeMB);
splitSizeMB,
enableCloseIdleReaders);
}
private void checkPrimaryKey(UniqueConstraint pk, String message) {
@ -160,6 +163,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
options.add(CHUNK_META_GROUP_SIZE);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
return options;
}
}

@ -17,6 +17,7 @@
package com.ververica.cdc.connectors.mongodb.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
@ -25,6 +26,7 @@ import org.junit.Test;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
/** Example Tests for {@link MongoDBSource}. */
public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {
@ -42,9 +44,13 @@ public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.deserializer(new JsonDebeziumDeserializationSchema())
.closeIdleReaders(true)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2

@ -45,6 +45,7 @@ import java.util.HashMap;
import java.util.Map;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
@ -103,6 +104,8 @@ public class MongoDBTableFactoryTest {
private static final int SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT =
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
private static final int CHUNK_META_GROUP_SIZE_DEFAULT = CHUNK_META_GROUP_SIZE.defaultValue();
private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT =
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue();
@Test
public void testCommonProperties() {
@ -129,7 +132,8 @@ public class MongoDBTableFactoryTest {
LOCAL_TIME_ZONE,
SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT,
CHUNK_META_GROUP_SIZE_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT);
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}
@ -147,6 +151,7 @@ public class MongoDBTableFactoryTest {
options.put("scan.incremental.snapshot.enabled", "true");
options.put("chunk-meta.group.size", "1001");
options.put("scan.incremental.snapshot.chunk.size.mb", "10");
options.put("scan.incremental.close-idle-reader.enabled", "true");
DynamicTableSource actualSource = createTableSource(SCHEMA, options);
MongoDBTableSource expectedSource =
@ -168,7 +173,8 @@ public class MongoDBTableFactoryTest {
LOCAL_TIME_ZONE,
true,
1001,
10);
10,
true);
assertEquals(expectedSource, actualSource);
}
@ -203,7 +209,8 @@ public class MongoDBTableFactoryTest {
LOCAL_TIME_ZONE,
SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT,
CHUNK_META_GROUP_SIZE_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT);
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

@ -237,6 +237,21 @@ public class MySqlSourceBuilder<T> {
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public MySqlSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}
/**
* Build the {@link MySqlSource}.
*

@ -129,6 +129,11 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
return snapshotSplitAssigner.waitingForFinishedSplits();
}
@Override
public boolean isStreamSplitAssigned() {
return isBinlogSplitAssigned;
}
@Override
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
return snapshotSplitAssigner.getFinishedSplitInfos();

@ -53,6 +53,11 @@ public interface MySqlSplitAssigner {
*/
boolean waitingForFinishedSplits();
/** Whether the split assigner is finished stream split assigning. */
default boolean isStreamSplitAssigned() {
throw new UnsupportedOperationException("Not support to assigning StreamSplit.");
}
/**
* Gets the finished splits' information. This is useful metadata to generate a binlog split
* that considering finished snapshot splits.

@ -57,6 +57,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
private final Map<ObjectPath, String> chunkKeyColumns;
@ -87,6 +88,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties dbzProperties,
Properties jdbcProperties,
Map<ObjectPath, String> chunkKeyColumns) {
@ -109,6 +111,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
@ -193,6 +196,10 @@ public class MySqlSourceConfig implements Serializable {
return scanNewlyAddedTableEnabled;
}
public boolean isCloseIdleReaders() {
return closeIdleReaders;
}
public Properties getDbzProperties() {
return dbzProperties;
}

@ -42,6 +42,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A factory to construct {@link MySqlSourceConfig}. */
@ -71,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean scanNewlyAddedTableEnabled = false;
private boolean closeIdleReaders = false;
private Properties jdbcProperties;
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private Properties dbzProperties;
@ -260,8 +262,24 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public MySqlSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) {
this.closeIdleReaders = closeIdleReaders;
return this;
}
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
Properties props = new Properties();
// hard code server name, because we don't need to distinguish it, docs:
// Logical name that identifies and provides a namespace for the particular
@ -342,6 +360,7 @@ public class MySqlSourceConfigFactory implements Serializable {
distributionFactorLower,
includeSchemaChanges,
scanNewlyAddedTableEnabled,
closeIdleReaders,
props,
jdbcProperties,
chunkKeyColumns);

@ -245,4 +245,14 @@ public class MySqlSourceOptions {
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "This column must be a column of the primary key.");
@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to close idle readers at the end of the snapshot phase. This feature depends on "
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");
}

@ -187,6 +187,14 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
continue;
}
if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) {
// close idle readers when snapshot phase finished.
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
LOG.info("Close idle reader of subtask {}", nextAwaiting);
continue;
}
Optional<MySqlSplit> split = splitAssigner.getNext();
if (split.isPresent()) {
final MySqlSplit mySqlSplit = split.get();

@ -0,0 +1,43 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.runtime.util.EnvironmentInformation;
/** Utilities for environment information at runtime. */
public class EnvironmentUtils {
private EnvironmentUtils() {}
private static final VersionComparable FLINK_1_14 = VersionComparable.fromVersionString("1.14");
public static VersionComparable runtimeFlinkVersion() {
return VersionComparable.fromVersionString(EnvironmentInformation.getVersion());
}
public static boolean supportCheckpointsAfterTasksFinished() {
return runtimeFlinkVersion().newerThanOrEqualTo(FLINK_1_14);
}
public static void checkSupportCheckpointsAfterTasksFinished(boolean closeIdleReaders) {
if (closeIdleReaders && !supportCheckpointsAfterTasksFinished()) {
throw new UnsupportedOperationException(
"The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. But the current version is "
+ runtimeFlinkVersion());
}
}
}

@ -0,0 +1,107 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils;
/** Used to compare version numbers at runtime. */
public class VersionComparable implements Comparable<VersionComparable> {
private int majorVersion;
private int minorVersion;
private int patchVersion;
private String versionString;
private VersionComparable(String versionString) {
this.versionString = versionString;
try {
int pos = versionString.indexOf('-');
String numberPart = versionString;
if (pos > 0) {
numberPart = versionString.substring(0, pos);
}
String[] versions = numberPart.split("\\.");
this.majorVersion = Integer.parseInt(versions[0]);
this.minorVersion = Integer.parseInt(versions[1]);
if (versions.length == 3) {
this.patchVersion = Integer.parseInt(versions[2]);
}
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Can not recognize version %s.", versionString));
}
}
public int getMajorVersion() {
return majorVersion;
}
public int getMinorVersion() {
return minorVersion;
}
public int getPatchVersion() {
return patchVersion;
}
public static VersionComparable fromVersionString(String versionString) {
return new VersionComparable(versionString);
}
@Override
public int compareTo(VersionComparable version) {
if (equalTo(version)) {
return 0;
} else if (newerThan(version)) {
return 1;
} else {
return -1;
}
}
public boolean equalTo(VersionComparable version) {
return majorVersion == version.majorVersion
&& minorVersion == version.minorVersion
&& patchVersion == version.patchVersion;
}
public boolean newerThan(VersionComparable version) {
if (majorVersion <= version.majorVersion) {
if (majorVersion < version.majorVersion) {
return false;
} else {
if (minorVersion <= version.minorVersion) {
if (minorVersion < version.patchVersion) {
return false;
} else {
return patchVersion > version.patchVersion;
}
}
}
}
return true;
}
public boolean newerThanOrEqualTo(VersionComparable version) {
return newerThan(version) || equalTo(version);
}
@Override
public String toString() {
return versionString;
}
}

@ -77,6 +77,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean scanNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
private final String chunkKeyColumn;
@ -113,6 +114,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
double distributionFactorLower,
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties jdbcProperties,
Duration heartbeatInterval,
@Nullable String chunkKeyColumn) {
@ -137,6 +139,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.jdbcProperties = jdbcProperties;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
@ -190,6 +193,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.closeIdleReaders(closeIdleReaders)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn)
@ -270,6 +274,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
closeIdleReaders,
jdbcProperties,
heartbeatInterval,
chunkKeyColumn);
@ -295,6 +300,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
&& distributionFactorUpper == that.distributionFactorUpper
&& distributionFactorLower == that.distributionFactorLower
&& scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled
&& closeIdleReaders == that.closeIdleReaders
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
@ -341,6 +347,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled,
closeIdleReaders,
jdbcProperties,
heartbeatInterval,
chunkKeyColumn);

@ -56,6 +56,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@ -119,6 +120,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@ -154,6 +157,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
closeIdleReaders,
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
chunkKeyColumn);
@ -198,6 +202,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
return options;

@ -121,6 +121,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -165,6 +166,7 @@ public class MySqlTableSourceFactoryTest {
0.01d,
StartupOptions.initial(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
"testCol");
@ -205,6 +207,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -243,6 +246,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -260,6 +264,7 @@ public class MySqlTableSourceFactoryTest {
options.put("jdbc.properties.useSSL", "false");
options.put("heartbeat.interval", "15213ms");
options.put("scan.incremental.snapshot.chunk.key-column", "testCol");
options.put("scan.incremental.close-idle-reader.enabled", "true");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
@ -289,6 +294,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
true,
true,
jdbcProperties,
Duration.ofMillis(15213),
"testCol");
@ -333,6 +339,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.specificOffset(offsetFile, offsetPos),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -369,6 +376,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -406,6 +414,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.earliest(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -444,6 +453,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.timestamp(0L),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -480,6 +490,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);
@ -521,6 +532,7 @@ public class MySqlTableSourceFactoryTest {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null);

@ -203,6 +203,21 @@ public class OracleSourceBuilder<T> {
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public OracleSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.

@ -47,6 +47,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean closeIdleReaders,
Properties dbzProperties,
Configuration dbzConfiguration,
String driverClassName,
@ -70,6 +71,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
dbzProperties,
dbzConfiguration,
driverClassName,

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A factory to initialize {@link OracleSourceConfig}. */
@ -58,6 +59,7 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
/** Creates a new {@link OracleSourceConfig} for the given subtask {@code subtaskId}. */
public OracleSourceConfig create(int subtaskId) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
Properties props = new Properties();
props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
// Logical name that identifies and provides a namespace for the particular Oracle
@ -114,6 +116,7 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
props,
dbzConfiguration,
DRIVER_ClASS_NAME,

@ -77,6 +77,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -109,7 +110,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
int connectionPoolSize,
double distributionFactorUpper,
double distributionFactorLower,
@Nullable String chunkKeyColumn) {
@Nullable String chunkKeyColumn,
boolean closeIdleReaders) {
this.physicalSchema = physicalSchema;
this.url = url;
this.port = port;
@ -133,6 +135,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.chunkKeyColumn = chunkKeyColumn;
this.closeIdleReaders = closeIdleReaders;
}
@Override
@ -178,6 +181,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
.connectMaxRetries(connectMaxRetries)
.distributionFactorUpper(distributionFactorUpper)
.distributionFactorLower(distributionFactorLower)
.closeIdleReaders(closeIdleReaders)
.build();
return SourceProvider.of(oracleChangeEventSource);
@ -241,7 +245,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -278,7 +283,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
&& Objects.equals(connectionPoolSize, that.connectionPoolSize)
&& Objects.equals(distributionFactorUpper, that.distributionFactorUpper)
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders);
}
@Override
@ -306,7 +312,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
}
@Override

@ -39,6 +39,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNEC
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
@ -93,6 +94,8 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
String serverTimezone = config.get(SERVER_TIME_ZONE);
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
@ -126,7 +129,8 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn);
chunkKeyColumn,
closeIdlerReaders);
}
@Override
@ -162,6 +166,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
return options;
}

@ -118,7 +118,9 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED
.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -152,7 +154,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -190,7 +193,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -214,6 +218,8 @@ public class OracleTableSourceFactoryTest {
String.valueOf(chunkSize));
options.put(SourceOptions.CHUNK_META_GROUP_SIZE.key(), String.valueOf(splitMetaGroupSize));
options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize));
options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), "true");
options.put(
JdbcSourceOptions.CONNECT_TIMEOUT.key(),
String.format("%ds", connectTimeout.getSeconds()));
@ -251,7 +257,8 @@ public class OracleTableSourceFactoryTest {
connectPoolSize,
distributionFactorUpper,
distributionFactorLower,
null);
null,
true);
assertEquals(expectedSource, actualSource);
}
@ -286,7 +293,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -321,7 +329,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -360,7 +369,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "table_name", "schema_name");

@ -41,6 +41,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean closeIdleReaders,
Properties dbzProperties,
Configuration dbzConfiguration,
String driverClassName,
@ -63,6 +64,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
dbzProperties,
dbzConfiguration,
driverClassName,

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Factory for creating {@link SqlServerSourceConfig}. */
@ -47,6 +48,7 @@ public class SqlServerSourceConfigFactory extends JdbcSourceConfigFactory {
@Override
public SqlServerSourceConfig create(int subtask) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
Properties props = new Properties();
props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
@ -98,6 +100,7 @@ public class SqlServerSourceConfigFactory extends JdbcSourceConfigFactory {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
props,
dbzConfiguration,
DRIVER_ClASS_NAME,

Loading…
Cancel
Save