|
|
|
@ -198,6 +198,64 @@ public class PostgresSourceITCase extends PostgresTestBase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testDebeziumSlotDropOnStop() throws Exception {
|
|
|
|
|
String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE;
|
|
|
|
|
customDatabase.createAndInitialize();
|
|
|
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
|
|
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
|
|
|
|
|
|
|
|
|
|
env.setParallelism(2);
|
|
|
|
|
env.enableCheckpointing(200L);
|
|
|
|
|
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
|
|
|
|
|
String sourceDDL =
|
|
|
|
|
format(
|
|
|
|
|
"CREATE TABLE customers ("
|
|
|
|
|
+ " id BIGINT NOT NULL,"
|
|
|
|
|
+ " name STRING,"
|
|
|
|
|
+ " address STRING,"
|
|
|
|
|
+ " phone_number STRING,"
|
|
|
|
|
+ " primary key (id) not enforced"
|
|
|
|
|
+ ") WITH ("
|
|
|
|
|
+ " 'connector' = 'postgres-cdc',"
|
|
|
|
|
+ " 'scan.incremental.snapshot.enabled' = 'true',"
|
|
|
|
|
+ " 'hostname' = '%s',"
|
|
|
|
|
+ " 'port' = '%s',"
|
|
|
|
|
+ " 'username' = '%s',"
|
|
|
|
|
+ " 'password' = '%s',"
|
|
|
|
|
+ " 'database-name' = '%s',"
|
|
|
|
|
+ " 'schema-name' = '%s',"
|
|
|
|
|
+ " 'table-name' = '%s',"
|
|
|
|
|
+ " 'scan.startup.mode' = '%s',"
|
|
|
|
|
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
|
|
|
|
|
+ " 'slot.name' = '%s', "
|
|
|
|
|
+ " 'debezium.slot.drop.on.stop' = 'true'"
|
|
|
|
|
+ ")",
|
|
|
|
|
customDatabase.getHost(),
|
|
|
|
|
customDatabase.getDatabasePort(),
|
|
|
|
|
customDatabase.getUsername(),
|
|
|
|
|
customDatabase.getPassword(),
|
|
|
|
|
customDatabase.getDatabaseName(),
|
|
|
|
|
SCHEMA_NAME,
|
|
|
|
|
"customers",
|
|
|
|
|
scanStartupMode,
|
|
|
|
|
getSlotName());
|
|
|
|
|
tEnv.executeSql(sourceDDL);
|
|
|
|
|
TableResult tableResult = tEnv.executeSql("select * from customers");
|
|
|
|
|
|
|
|
|
|
// first step: check the snapshot data
|
|
|
|
|
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
|
|
|
|
|
checkSnapshotData(
|
|
|
|
|
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// second step: check the stream data
|
|
|
|
|
checkStreamDataWithDDLDuringFailover(
|
|
|
|
|
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
|
|
|
|
|
|
|
|
|
|
tableResult.getJobClient().get().cancel().get();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void testPostgresParallelSource(
|
|
|
|
|
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
|
|
|
|
|
throws Exception {
|
|
|
|
@ -371,6 +429,61 @@ public class PostgresSourceITCase extends PostgresTestBase {
|
|
|
|
|
assertTrue(!hasNextData(iterator));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkStreamDataWithDDLDuringFailover(
|
|
|
|
|
TableResult tableResult,
|
|
|
|
|
FailoverType failoverType,
|
|
|
|
|
FailoverPhase failoverPhase,
|
|
|
|
|
String[] captureCustomerTables)
|
|
|
|
|
throws Exception {
|
|
|
|
|
waitUntilJobRunning(tableResult);
|
|
|
|
|
CloseableIterator<Row> iterator = tableResult.collect();
|
|
|
|
|
JobID jobId = tableResult.getJobClient().get().getJobID();
|
|
|
|
|
|
|
|
|
|
for (String tableId : captureCustomerTables) {
|
|
|
|
|
makeFirstPartStreamEvents(
|
|
|
|
|
getConnection(),
|
|
|
|
|
customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// wait for the stream reading
|
|
|
|
|
Thread.sleep(2000L);
|
|
|
|
|
|
|
|
|
|
if (failoverPhase == FailoverPhase.STREAM) {
|
|
|
|
|
triggerFailover(
|
|
|
|
|
failoverType,
|
|
|
|
|
jobId,
|
|
|
|
|
miniClusterResource.getMiniCluster(),
|
|
|
|
|
() -> {
|
|
|
|
|
for (String tableId : captureCustomerTables) {
|
|
|
|
|
try {
|
|
|
|
|
makeSecondPartStreamEvents(
|
|
|
|
|
getConnection(),
|
|
|
|
|
customDatabase.getDatabaseName()
|
|
|
|
|
+ '.'
|
|
|
|
|
+ SCHEMA_NAME
|
|
|
|
|
+ '.'
|
|
|
|
|
+ tableId);
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sleepMs(200);
|
|
|
|
|
});
|
|
|
|
|
waitUntilJobRunning(tableResult);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<String> expectedStreamData = new ArrayList<>();
|
|
|
|
|
for (int i = 0; i < captureCustomerTables.length; i++) {
|
|
|
|
|
expectedStreamData.addAll(firstPartStreamEvents);
|
|
|
|
|
expectedStreamData.addAll(secondPartStreamEvents);
|
|
|
|
|
}
|
|
|
|
|
// wait for the stream reading
|
|
|
|
|
Thread.sleep(2000L);
|
|
|
|
|
|
|
|
|
|
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
|
|
|
|
|
assertTrue(!hasNextData(iterator));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void sleepMs(long millis) {
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(millis);
|
|
|
|
|