|
|
@ -123,7 +123,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
|
|
|
|
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
|
|
|
|
|
|
|
|
|
|
|
|
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
|
|
|
|
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_PRE_HIGHWATERMARK_HOOK);
|
|
|
|
|
|
|
|
|
|
|
|
List<String> expectedRecords =
|
|
|
|
List<String> expectedRecords =
|
|
|
|
Arrays.asList(
|
|
|
|
Arrays.asList(
|
|
|
@ -146,17 +146,23 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
|
|
|
|
"+I[1016, user_17, Shanghai, 123567891234]",
|
|
|
|
"+I[1016, user_17, Shanghai, 123567891234]",
|
|
|
|
"+I[1017, user_18, Shanghai, 123567891234]",
|
|
|
|
"+I[1017, user_18, Shanghai, 123567891234]",
|
|
|
|
"+I[1018, user_19, Shanghai, 123567891234]",
|
|
|
|
"+I[1018, user_19, Shanghai, 123567891234]",
|
|
|
|
"+I[2000, user_21, Pittsburgh, 123567891234]",
|
|
|
|
"+I[1019, user_20, Shanghai, 123567891234]",
|
|
|
|
"+I[15213, user_15213, Shanghai, 123567891234]");
|
|
|
|
"+I[2000, user_21, Shanghai, 123567891234]",
|
|
|
|
// when enable backfill, the wal log between [snapshot, high_watermark) will be
|
|
|
|
"+I[15213, user_15213, Shanghai, 123567891234]",
|
|
|
|
// applied as snapshot image
|
|
|
|
"-U[2000, user_21, Shanghai, 123567891234]",
|
|
|
|
|
|
|
|
"+U[2000, user_21, Pittsburgh, 123567891234]",
|
|
|
|
|
|
|
|
"-D[1019, user_20, Shanghai, 123567891234]");
|
|
|
|
|
|
|
|
// In sqlserver database, because the capture process extracts change data from the
|
|
|
|
|
|
|
|
// transaction log, there is a built-in latency between the time that a change is committed
|
|
|
|
|
|
|
|
// to a source table and the time that the change appears within its associated change
|
|
|
|
|
|
|
|
// table.Then in streaming phase, the log which should be ignored will be read again.
|
|
|
|
assertEqualsInAnyOrder(expectedRecords, records);
|
|
|
|
assertEqualsInAnyOrder(expectedRecords, records);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
|
|
|
|
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
|
|
|
|
|
|
|
|
|
|
|
|
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
|
|
|
|
List<String> records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK);
|
|
|
|
|
|
|
|
|
|
|
|
List<String> expectedRecords =
|
|
|
|
List<String> expectedRecords =
|
|
|
|
Arrays.asList(
|
|
|
|
Arrays.asList(
|
|
|
@ -180,9 +186,15 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
|
|
|
|
"+I[1017, user_18, Shanghai, 123567891234]",
|
|
|
|
"+I[1017, user_18, Shanghai, 123567891234]",
|
|
|
|
"+I[1018, user_19, Shanghai, 123567891234]",
|
|
|
|
"+I[1018, user_19, Shanghai, 123567891234]",
|
|
|
|
"+I[2000, user_21, Pittsburgh, 123567891234]",
|
|
|
|
"+I[2000, user_21, Pittsburgh, 123567891234]",
|
|
|
|
"+I[15213, user_15213, Shanghai, 123567891234]");
|
|
|
|
"+I[15213, user_15213, Shanghai, 123567891234]",
|
|
|
|
// when enable backfill, the wal log between [low_watermark, snapshot) will be applied
|
|
|
|
"+I[15213, user_15213, Shanghai, 123567891234]",
|
|
|
|
// as snapshot image
|
|
|
|
"-U[2000, user_21, Shanghai, 123567891234]",
|
|
|
|
|
|
|
|
"+U[2000, user_21, Pittsburgh, 123567891234]",
|
|
|
|
|
|
|
|
"-D[1019, user_20, Shanghai, 123567891234]");
|
|
|
|
|
|
|
|
// In sqlserver database, because the capture process extracts change data from the
|
|
|
|
|
|
|
|
// transaction log, there is a built-in latency between the time that a change is committed
|
|
|
|
|
|
|
|
// to a source table and the time that the change appears within its associated change
|
|
|
|
|
|
|
|
// table.Then in streaming phase, the log which should be ignored will be read again.
|
|
|
|
assertEqualsInAnyOrder(expectedRecords, records);
|
|
|
|
assertEqualsInAnyOrder(expectedRecords, records);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -272,7 +284,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
|
|
|
|
env.enableCheckpointing(1000);
|
|
|
|
env.enableCheckpointing(1000);
|
|
|
|
env.setParallelism(1);
|
|
|
|
env.setParallelism(1);
|
|
|
|
|
|
|
|
|
|
|
|
ResolvedSchema customersSchame =
|
|
|
|
ResolvedSchema customersSchema =
|
|
|
|
new ResolvedSchema(
|
|
|
|
new ResolvedSchema(
|
|
|
|
Arrays.asList(
|
|
|
|
Arrays.asList(
|
|
|
|
physical("id", BIGINT().notNull()),
|
|
|
|
physical("id", BIGINT().notNull()),
|
|
|
@ -281,7 +293,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
|
|
|
|
physical("phone_number", STRING())),
|
|
|
|
physical("phone_number", STRING())),
|
|
|
|
new ArrayList<>(),
|
|
|
|
new ArrayList<>(),
|
|
|
|
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
|
|
|
|
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
|
|
|
|
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchame);
|
|
|
|
TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchema);
|
|
|
|
String tableId = customerTable.getTableId();
|
|
|
|
String tableId = customerTable.getTableId();
|
|
|
|
|
|
|
|
|
|
|
|
SqlServerSourceBuilder.SqlServerIncrementalSource source =
|
|
|
|
SqlServerSourceBuilder.SqlServerIncrementalSource source =
|
|
|
@ -310,14 +322,10 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
|
|
|
|
(sourceConfig, split) -> {
|
|
|
|
(sourceConfig, split) -> {
|
|
|
|
SqlServerDialect dialect =
|
|
|
|
SqlServerDialect dialect =
|
|
|
|
new SqlServerDialect((SqlServerSourceConfig) sourceConfig);
|
|
|
|
new SqlServerDialect((SqlServerSourceConfig) sourceConfig);
|
|
|
|
JdbcConnection sqlServerConnection =
|
|
|
|
try (JdbcConnection sqlServerConnection =
|
|
|
|
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig);
|
|
|
|
dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig)) {
|
|
|
|
sqlServerConnection.execute(statements);
|
|
|
|
sqlServerConnection.execute(statements);
|
|
|
|
sqlServerConnection.commit();
|
|
|
|
sqlServerConnection.commit();
|
|
|
|
try {
|
|
|
|
|
|
|
|
Thread.sleep(5000L);
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|