[sqlserver] Fix old change data that will be captured when the latest mode starts (#2176)

pull/2171/head^2
ehui 2 years ago committed by GitHub
parent 7504217968
commit 28df7a79ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,7 +27,9 @@ import java.util.Map;
public class LsnFactory extends OffsetFactory {
@Override
public Offset newOffset(Map<String, String> offset) {
return new LsnOffset(Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY)));
Lsn changeLsn = Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY));
Lsn commitLsn = Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
return new LsnOffset(changeLsn, commitLsn, null);
}
@Override

@ -52,6 +52,7 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
public void execute(Context context) throws Exception {
SqlServerSourceFetchTaskContext sourceFetchContext =
(SqlServerSourceFetchTaskContext) context;
sourceFetchContext.getOffsetContext().preSnapshotCompletion();
taskRunning = true;
redoLogSplitReadTask =
new LsnSplitReadTask(

@ -192,14 +192,16 @@ public class SqlServerUtils {
offsetStrMap.put(
entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
return new LsnOffset(Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY)));
Lsn changeLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY));
Lsn commitLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.COMMIT_LSN_KEY));
return new LsnOffset(changeLsn, commitLsn, null);
}
/** Fetch current largest log sequence number (LSN) of the database. */
public static LsnOffset currentLsn(SqlServerConnection connection) {
try {
Lsn maxLsn = connection.getMaxLsn();
return new LsnOffset(maxLsn);
return new LsnOffset(maxLsn, maxLsn, null);
} catch (SQLException e) {
throw new FlinkRuntimeException(e.getMessage(), e);
}

@ -27,6 +27,8 @@ import com.ververica.cdc.connectors.sqlserver.SqlServerTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.SQLException;
@ -36,12 +38,14 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
/** Integration tests for SqlServer Table source. */
@RunWith(Parameterized.class)
public class SqlServerConnectorITCase extends SqlServerTestBase {
private final StreamExecutionEnvironment env =
@ -52,11 +56,29 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
// enable the parallelismSnapshot (i.e: The new source OracleParallelSource)
private final boolean parallelismSnapshot;
public SqlServerConnectorITCase(boolean parallelismSnapshot) {
this.parallelismSnapshot = parallelismSnapshot;
}
@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
public static Object[] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
}
@Before
public void before() {
TestValuesTableFactory.clearAllData();
if (parallelismSnapshot) {
env.setParallelism(4);
env.enableCheckpointing(200);
} else {
env.setParallelism(1);
}
}
@Test
public void testConsumingAllEvents()
@ -75,6 +97,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
@ -82,6 +105,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"inventory",
"dbo.products");
String sinkDDL =
@ -160,6 +184,82 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
initializeSqlServerTable("inventory");
Connection connection = getJdbcConnection();
Statement statement = connection.createStatement();
// The following two change records will be discarded in the 'latest-offset' mode
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
Thread.sleep(5000L);
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'sqlserver-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"inventory",
"dbo.products");
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
Thread.sleep(30000L);
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('hammer','18oz carpenters hammer',1.2);");
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 3-wheel scooter',5.20);");
waitForSinkSize("sink", 2);
String[] expected =
new String[] {
"112,hammer,18oz carpenters hammer,1.200",
"113,scooter,Big 3-wheel scooter,5.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testAllTypes() throws Throwable {
initializeSqlServerTable("column_type_test");
@ -199,6 +299,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
@ -206,6 +307,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"column_type_test",
"dbo.full_types");
String sinkDDL =
@ -288,6 +390,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
@ -295,6 +398,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
"inventory",
"dbo.products");

Loading…
Cancel
Save