[cdc-connector][sqlserver] Expose ability to skip backfill in snapshot period for SqlServer CDC.(#2783)

This closes #2783.
pull/2793/head
Hongshun Wang 1 year ago committed by Leonard Xu
parent a6379358e4
commit f57d6f6e67

@ -55,7 +55,8 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
protected Properties dbzProperties;
protected String chunkKeyColumn;
protected boolean skipSnapshotBackfill;
protected boolean skipSnapshotBackfill =
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
/** Integer port number of the database server. */
public JdbcSourceConfigFactory hostname(String hostname) {

@ -212,6 +212,22 @@ public class SqlServerSourceBuilder<T> {
return this;
}
/**
* Whether to skip backfill in snapshot reading phase.
*
* <p>If backfill is skipped, changes on captured tables during snapshot phase will be consumed
* later in binlog reading phase instead of being merged into the snapshot.
*
* <p>WARNING: Skipping backfill might lead to data inconsistency because some binlog events
* happened within the snapshot phase might be replayed (only at-least-once semantic is
* promised). For example updating an already updated value in snapshot, or deleting an already
* deleted entry in snapshot. These replayed binlog events should be handled specially.
*/
public SqlServerSourceBuilder<T> skipSnapshotBackfill(boolean skipSnapshotBackfill) {
this.configFactory.skipSnapshotBackfill(skipSnapshotBackfill);
return this;
}
/**
* Build the {@link SqlServerIncrementalSource}.
*

@ -53,7 +53,8 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
String chunkKeyColumn) {
String chunkKeyColumn,
boolean skipSnapshotBackfill) {
super(
startupOptions,
databaseList,
@ -78,7 +79,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn,
true);
skipSnapshotBackfill);
}
@Override

@ -100,6 +100,7 @@ public class SqlServerSourceConfigFactory extends JdbcSourceConfigFactory {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn);
chunkKeyColumn,
skipSnapshotBackfill);
}
}

@ -40,6 +40,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNEC
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
@ -137,6 +138,7 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory {
String chunkKeyColumn =
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@ -171,7 +173,8 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory {
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders);
closeIdleReaders,
skipSnapshotBackfill);
}
@Override
@ -207,6 +210,7 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
return options;
}

@ -78,6 +78,7 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
private final double distributionFactorLower;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
private final boolean skipSnapshotBackfill;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -110,7 +111,8 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
double distributionFactorUpper,
double distributionFactorLower,
@Nullable String chunkKeyColumn,
boolean closeIdleReaders) {
boolean closeIdleReaders,
boolean skipSnapshotBackfill) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -134,6 +136,7 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
this.distributionFactorLower = distributionFactorLower;
this.chunkKeyColumn = chunkKeyColumn;
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
}
@Override
@ -181,6 +184,7 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
.distributionFactorLower(distributionFactorLower)
.chunkKeyColumn(chunkKeyColumn)
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.build();
return SourceProvider.of(sqlServerChangeEventSource);
} else {
@ -240,7 +244,8 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders);
closeIdleReaders,
skipSnapshotBackfill);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -277,7 +282,8 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
&& Objects.equals(distributionFactorUpper, that.distributionFactorUpper)
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders);
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill);
}
@Override
@ -305,7 +311,8 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders);
closeIdleReaders,
skipSnapshotBackfill);
}
@Override

@ -17,13 +17,24 @@
package com.ververica.cdc.connectors.sqlserver.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
import com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerDialect;
import com.ververica.cdc.connectors.sqlserver.testutils.TestTable;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.lang3.StringUtils;
import org.junit.Rule;
import org.junit.Test;
@ -33,10 +44,16 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
import static org.apache.flink.util.Preconditions.checkState;
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
@ -47,6 +64,9 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
@Test
public void testReadSingleTableWithSingleParallelism() throws Exception {
testSqlServerParallelSource(
@ -90,6 +110,234 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"dbo.customers"});
}
@Test
public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
testSqlServerParallelSource(
DEFAULT_PARALLELISM,
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {"dbo.customers"},
true);
}
@Test
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]");
// when enable backfill, the wal log between [snapshot, high_watermark) will be
// applied as snapshot image
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]");
// when enable backfill, the wal log between [low_watermark, snapshot) will be applied
// as snapshot image
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
// when skip backfill, the wal log between (snapshot, high_watermark) will be seen as
// stream event.
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
// when skip backfill, the wal log between (snapshot, high_watermark) will still be
// seen as stream event. This will occur data duplicate. For example, user_20 will be
// deleted twice, and user_15213 will be inserted twice.
assertEqualsInAnyOrder(expectedRecords, records);
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
String databaseName = "customer";
initializeSqlServerTable(databaseName);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
ResolvedSchema customersSchame =
new ResolvedSchema(
Arrays.asList(
physical("id", BIGINT().notNull()),
physical("name", STRING()),
physical("address", STRING()),
physical("phone_number", STRING())),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchame);
String tableId = customerTable.getTableId();
SqlServerSourceBuilder.SqlServerIncrementalSource source =
SqlServerSourceBuilder.SqlServerIncrementalSource.<RowData>builder()
.hostname(MSSQL_SERVER_CONTAINER.getHost())
.port(MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT))
.username(MSSQL_SERVER_CONTAINER.getUsername())
.password(MSSQL_SERVER_CONTAINER.getPassword())
.databaseList(databaseName)
.tableList(getTableNameRegex(new String[] {"dbo.customers"}))
.deserializer(customerTable.getDeserializer())
.skipSnapshotBackfill(skipSnapshotBackfill)
.build();
// Do some database operations during hook in snapshot period.
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
String[] statements =
new String[] {
String.format(
"INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')",
tableId),
String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId),
String.format("DELETE FROM %s WHERE id=1019", tableId)
};
SnapshotPhaseHook snapshotPhaseHook =
(sourceConfig, split) -> {
SqlServerDialect dialect =
new SqlServerDialect((SqlServerSourceConfig) sourceConfig);
JdbcConnection postgresConnection =
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig);
postgresConnection.execute(statements);
postgresConnection.commit();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
}
source.setSnapshotHooks(hooks);
List<String> records = new ArrayList<>();
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
}
private void testSqlServerParallelSource(
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
throws Exception {
@ -103,6 +351,17 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
testSqlServerParallelSource(
parallelism, failoverType, failoverPhase, captureCustomerTables, false);
}
private void testSqlServerParallelSource(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables,
boolean skipSnapshotBackfill)
throws Exception {
String databaseName = "customer";
@ -131,14 +390,16 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '4'"
+ " 'scan.incremental.snapshot.chunk.size' = '4',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
databaseName,
getTableNameRegex(captureCustomerTables));
getTableNameRegex(captureCustomerTables),
skipSnapshotBackfill);
// first step: check the snapshot data
String[] snapshotForSingleTable =
@ -239,6 +500,17 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
}
}
public static List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {

@ -66,7 +66,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
String tableId = databaseName + "." + tableName;
String[] changingDataSql =
@ -80,8 +80,15 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
};
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostHighWatermarkAction(
(dialect, split) -> executeSql(sourceConfig, changingDataSql));
hooks.setPostLowWatermarkAction(
(config, split) -> {
executeSql((SqlServerSourceConfig) config, changingDataSql);
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
@ -112,7 +119,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
List<String> actual =
readTableSnapshotSplits(
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType);
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType, hooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -126,7 +133,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
String tableId = databaseName + "." + tableName;
String[] insertDataSql =
@ -136,8 +143,15 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
};
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostHighWatermarkAction(
(dialect, split) -> executeSql(sourceConfig, insertDataSql));
hooks.setPreHighWatermarkAction(
(config, split) -> {
executeSql((SqlServerSourceConfig) config, insertDataSql);
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
@ -170,7 +184,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
List<String> actual =
readTableSnapshotSplits(
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType);
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType, hooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -183,8 +197,8 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sqlServerSourceConfigs = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
String tableId = databaseName + "." + tableName;
String[] deleteDataSql =
@ -195,13 +209,20 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostLowWatermarkAction(
(sourceConfig, split) -> executeSql(sqlServerSourceConfigs, deleteDataSql));
(config, split) -> {
executeSql((SqlServerSourceConfig) config, deleteDataSql);
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sqlServerSourceConfigs,
sourceConfig,
sqlServerDialect,
createSqlServerConnection(sqlServerSourceConfigs.getDbzConnectorConfig()),
createSqlServerConnection(sqlServerSourceConfigs.getDbzConnectorConfig()));
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
final DataType dataType =
DataTypes.ROW(
@ -209,8 +230,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<SnapshotSplit> snapshotSplits =
getSnapshotSplits(sqlServerSourceConfigs, sqlServerDialect);
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);
String[] expected =
new String[] {
@ -229,16 +249,6 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
private List<String> readTableSnapshotSplits(
List<SnapshotSplit> snapshotSplits,
SqlServerSourceFetchTaskContext taskContext,
int scanSplitsNum,
DataType dataType)
throws Exception {
return readTableSnapshotSplits(
snapshotSplits, taskContext, scanSplitsNum, dataType, SnapshotPhaseHooks.empty());
}
private List<String> readTableSnapshotSplits(
List<SnapshotSplit> snapshotSplits,
SqlServerSourceFetchTaskContext taskContext,

@ -110,7 +110,8 @@ public class SqlServerTableFactoryTest {
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null,
false);
false,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -126,6 +127,7 @@ public class SqlServerTableFactoryTest {
properties.put("connect.timeout", "45s");
properties.put("scan.incremental.snapshot.chunk.key-column", "testCol");
properties.put("scan.incremental.close-idle-reader.enabled", "true");
properties.put("scan.incremental.snapshot.backfill.skip", "true");
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
@ -151,6 +153,7 @@ public class SqlServerTableFactoryTest {
40.5d,
0.01d,
"testCol",
true,
true);
assertEquals(expectedSource, actualSource);
}
@ -191,7 +194,8 @@ public class SqlServerTableFactoryTest {
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
"testCol",
true);
true,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -230,7 +234,8 @@ public class SqlServerTableFactoryTest {
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null,
false);
false,
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");

@ -0,0 +1,98 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.sqlserver.testutils;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.sqlserver.table.SqlServerDeserializationConverterFactory;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* Test utility for creating converter, formatter and deserializer of a table in the test database.
*/
public class TestTable {
private final String databaseName;
private final String tableName;
private final String schemaName;
private final ResolvedSchema schema;
// Lazily initialized components
private RowRowConverter rowRowConverter;
private RowDataDebeziumDeserializeSchema deserializer;
private RecordsFormatter recordsFormatter;
public TestTable(
String databaseName, String schemaName, String tableName, ResolvedSchema schema) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
this.schema = schema;
}
public String getTableId() {
return String.format("%s.%s.%s", databaseName, schemaName, tableName);
}
public RowType getRowType() {
return (RowType) schema.toPhysicalRowDataType().getLogicalType();
}
public RowDataDebeziumDeserializeSchema getDeserializer() {
if (deserializer == null) {
deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(getRowType())
.setResultTypeInfo(InternalTypeInfo.of(getRowType()))
.setUserDefinedConverterFactory(
SqlServerDeserializationConverterFactory.instance())
.build();
}
return deserializer;
}
public RowRowConverter getRowRowConverter() {
if (rowRowConverter == null) {
rowRowConverter = RowRowConverter.create(schema.toPhysicalRowDataType());
}
return rowRowConverter;
}
public RecordsFormatter getRecordsFormatter() {
if (recordsFormatter == null) {
recordsFormatter = new RecordsFormatter(schema.toPhysicalRowDataType());
}
return recordsFormatter;
}
public String stringify(RowData rowData) {
return getRowRowConverter().toExternal(rowData).toString();
}
public List<String> stringify(List<SourceRecord> sourceRecord) {
return getRecordsFormatter().format(sourceRecord);
}
}
Loading…
Cancel
Save