[postgres] Close idle readers when snapshot finished (#2400)

(cherry picked from commit 07db169765)
pull/2694/head
wuzhenhua 1 year ago committed by Leonard Xu
parent d4461f2383
commit d26360010a

@ -215,6 +215,23 @@ public class PostgresSourceBuilder<T> {
return this;
}
/**
* scan.incremental.close-idle-reader.enabled
*
* <p>Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public PostgresSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -51,6 +52,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
Properties props = new Properties();
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
props.setProperty("plugin.name", pluginName);

@ -42,6 +42,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWO
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCHEMA_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
@ -108,6 +109,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
String chunkKeyColumn =
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
@ -148,7 +151,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn);
chunkKeyColumn,
closeIdlerReaders);
}
@Override
@ -187,6 +191,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
options.add(CONNECT_MAX_RETRIES);
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
return options;
}

@ -81,6 +81,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final Duration heartbeatInterval;
private final StartupOptions startupOptions;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -116,7 +117,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
double distributionFactorLower,
Duration heartbeatInterval,
StartupOptions startupOptions,
@Nullable String chunkKeyColumn) {
@Nullable String chunkKeyColumn,
boolean closeIdleReaders) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -144,6 +146,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.closeIdleReaders = closeIdleReaders;
}
@Override
@ -202,6 +205,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
.startupOptions(startupOptions)
.chunkKeyColumn(chunkKeyColumn)
.heartbeatInterval(heartbeatInterval)
.closeIdleReaders(closeIdleReaders)
.build();
return SourceProvider.of(parallelSource);
} else {
@ -266,7 +270,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -306,7 +311,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
&& Objects.equals(heartbeatInterval, that.heartbeatInterval)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders);
}
@Override
@ -337,7 +343,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
}
@Override

@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES;
@ -110,6 +111,8 @@ public class PostgreSQLTableFactoryTest {
private static final String MY_SCHEMA = "public";
private static final String MY_SLOT_NAME = "flinktest";
private static final Properties PROPERTIES = new Properties();
private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT =
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue();
@Test
public void testCommonProperties() {
@ -142,7 +145,8 @@ public class PostgreSQLTableFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}
@ -182,7 +186,8 @@ public class PostgreSQLTableFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}
@ -222,7 +227,8 @@ public class PostgreSQLTableFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
@ -272,7 +278,8 @@ public class PostgreSQLTableFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}
@ -312,7 +319,8 @@ public class PostgreSQLTableFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.latest(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}

Loading…
Cancel
Save