[cdc-connector][base] Support skip snapshot backfill for incremental snapshot connector

pull/2793/head
Hongshun Wang 1 year ago committed by Leonard Xu
parent 13be14fe3f
commit 4e248fad33

@ -34,6 +34,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
protected final double distributionFactorLower;
protected final boolean includeSchemaChanges;
protected final boolean closeIdleReaders;
protected final boolean skipSnapshotBackfill;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -49,6 +50,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean closeIdleReaders,
boolean skipSnapshotBackfill,
Properties dbzProperties,
Configuration dbzConfiguration) {
this.startupOptions = startupOptions;
@ -58,6 +60,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
}
@ -102,4 +105,9 @@ public abstract class BaseSourceConfig implements SourceConfig {
public Configuration getDbzConfiguration() {
return Configuration.from(dbzProperties);
}
@Override
public boolean isSkipSnapshotBackfill() {
return skipSnapshotBackfill;
}
}

@ -68,7 +68,8 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
String chunkKeyColumn) {
String chunkKeyColumn,
boolean skipSnapshotBackfill) {
super(
startupOptions,
splitSize,
@ -77,6 +78,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
distributionFactorLower,
includeSchemaChanges,
closeIdleReaders,
skipSnapshotBackfill,
dbzProperties,
dbzConfiguration);
this.driverClassName = driverClassName;

@ -55,6 +55,7 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
protected Properties dbzProperties;
protected String chunkKeyColumn;
protected boolean skipSnapshotBackfill;
/** Integer port number of the database server. */
public JdbcSourceConfigFactory hostname(String hostname) {
@ -224,6 +225,10 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
return this;
}
public void skipSnapshotBackfill(boolean skipSnapshotBackfill) {
this.skipSnapshotBackfill = skipSnapshotBackfill;
}
@Override
public abstract JdbcSourceConfig create(int subtask);
}

@ -35,6 +35,8 @@ public interface SourceConfig extends Serializable {
boolean isCloseIdleReaders();
boolean isSkipSnapshotBackfill();
/** Factory for the {@code SourceConfig}. */
@FunctionalInterface
interface Factory<C extends SourceConfig> extends Serializable {

@ -121,4 +121,12 @@ public class SourceOptions {
"Whether to close idle readers at the end of the snapshot phase. This feature depends on "
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");
@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP =
ConfigOptions.key("scan.incremental.snapshot.backfill.skip")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially.");
}

@ -217,6 +217,11 @@ public class IncrementalSource<T, C extends SourceConfig>
offsetFactory);
}
/**
* Set snapshot hooks only for test. The SnapshotPhaseHook should be serializable
*
* @param snapshotHooks
*/
@VisibleForTesting
public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) {
this.snapshotHooks = snapshotHooks;

@ -71,7 +71,17 @@ public abstract class AbstractScanFetchTask implements FetchTask {
if (snapshotPhaseHooks.getPreHighWatermarkAction() != null) {
snapshotPhaseHooks.getPreHighWatermarkAction().accept(sourceConfig, snapshotSplit);
}
Offset highWatermark = dialect.displayCurrentOffset(sourceConfig);
// Directly set HW = LW if backfill is skipped. Binlog events created during snapshot
// phase could be processed later in binlog reading phase.
//
// Note that this behaviour downgrades the delivery guarantee to at-least-once. We can't
// promise that the snapshot is exactly the view of the table at low watermark moment,
// so binlog events created during snapshot might be replayed later in binlog reading
// phase.
Offset highWatermark =
context.getSourceConfig().isSkipSnapshotBackfill()
? lowWatermark
: dialect.displayCurrentOffset(sourceConfig);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,

@ -81,7 +81,8 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
null);
null,
true);
}
@Override

@ -184,6 +184,11 @@ public class MongoDBSourceConfig implements SourceConfig {
return disableCursorTimeout;
}
@Override
public boolean isSkipSnapshotBackfill() {
return skipSnapshotBackfill;
}
@Override
public boolean equals(Object o) {
if (this == o) {

@ -85,7 +85,8 @@ public class OracleSourceConfig extends JdbcSourceConfig {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn);
chunkKeyColumn,
true);
this.url = url;
}

@ -85,7 +85,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn);
chunkKeyColumn,
true);
this.subtaskId = subtaskId;
}

@ -71,18 +71,22 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
public void execute(Context context) throws Exception {
PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) context;
PostgresSourceConfig sourceConfig = (PostgresSourceConfig) context.getSourceConfig();
try {
// create slot here, because a slot can only read wal-log after its own creation.
createSlotForBackFillReadTask(
// if skip backfill, no need to create slot here
maybeCreateSlotForBackFillReadTask(
ctx.getConnection(),
ctx.getReplicationConnection(),
((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask(),
ctx.getPluginName());
sourceConfig.getSlotNameForBackfillTask(),
ctx.getPluginName(),
sourceConfig.isSkipSnapshotBackfill());
super.execute(context);
} finally {
// remove slot after snapshot slit finish
dropSlotForBackFillReadTask(
(PostgresReplicationConnection) ctx.getReplicationConnection());
maybeDropSlotForBackFillReadTask(
(PostgresReplicationConnection) ctx.getReplicationConnection(),
sourceConfig.isSkipSnapshotBackfill());
}
}
@ -149,11 +153,17 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
* Create a slot before snapshot reading so that the slot can track the WAL log during the
* snapshot reading phase.
*/
private void createSlotForBackFillReadTask(
private void maybeCreateSlotForBackFillReadTask(
PostgresConnection jdbcConnection,
ReplicationConnection replicationConnection,
String slotName,
String pluginName) {
String pluginName,
boolean skipSnapshotBackfill) {
// if skip backfill, no need to create slot here
if (skipSnapshotBackfill) {
return;
}
try {
SlotState slotInfo = null;
try {
@ -180,10 +190,17 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
}
/** Drop slot for backfill task and close replication connection. */
private void dropSlotForBackFillReadTask(PostgresReplicationConnection replicationConnection) {
private void maybeDropSlotForBackFillReadTask(
PostgresReplicationConnection replicationConnection, boolean skipSnapshotBackfill) {
// if skip backfill, no need to create slot here
if (skipSnapshotBackfill) {
return;
}
try {
replicationConnection.close(true);
} catch (Throwable t) {
LOG.info("here exception occurs");
throw new FlinkRuntimeException(t);
}
}

@ -77,7 +77,8 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn);
chunkKeyColumn,
true);
}
@Override

@ -183,7 +183,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerSourceConfig sqlServerSourceConfigs = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
String tableId = databaseName + "." + tableName;
@ -195,13 +195,13 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostLowWatermarkAction(
(dialect, split) -> executeSql(sourceConfig, deleteDataSql));
(sourceConfig, split) -> executeSql(sqlServerSourceConfigs, deleteDataSql));
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
sqlServerSourceConfigs,
sqlServerDialect,
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
createSqlServerConnection(sqlServerSourceConfigs.getDbzConnectorConfig()),
createSqlServerConnection(sqlServerSourceConfigs.getDbzConnectorConfig()));
final DataType dataType =
DataTypes.ROW(
@ -209,7 +209,8 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);
List<SnapshotSplit> snapshotSplits =
getSnapshotSplits(sqlServerSourceConfigs, sqlServerDialect);
String[] expected =
new String[] {

Loading…
Cancel
Save