[postgres-cdc] Add tests for latest-offset startup strategy (#2527)

pull/2687/head
Hongshun Wang 1 year ago committed by GitHub
parent de45676faa
commit 7da3eaef77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -38,6 +38,8 @@ import org.apache.commons.lang3.StringUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.SQLException;
import java.util.ArrayList;
@ -62,6 +64,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */
@RunWith(Parameterized.class)
public class PostgresSourceITCase extends PostgresTestBase {
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
@ -71,6 +74,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
private static final String DB_NAME_PREFIX = "postgres";
private static final String SCHEMA_NAME = "customer";
private final String scanStartupMode;
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
@Rule
@ -101,6 +106,15 @@ public class PostgresSourceITCase extends PostgresTestBase {
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]");
public PostgresSourceITCase(String scanStartupMode) {
this.scanStartupMode = scanStartupMode;
}
@Parameterized.Parameters(name = "scanStartupMode: {0}")
public static Object[] parameters() {
return new Object[][] {new Object[] {"initial"}, new Object[] {"latest-offset"}};
}
/** Second part stream events, which is made by {@link #makeSecondPartStreamEvents}. */
private final List<String> secondPartStreamEvents =
Arrays.asList(
@ -178,82 +192,101 @@ public class PostgresSourceITCase extends PostgresTestBase {
}
@Test
public void testConsumingTableWithoutPrimaryKey() {
try {
public void testConsumingTableWithoutPrimaryKey() throws Exception {
if (scanStartupMode == DEFAULT_SCAN_STARTUP_MODE) {
try {
testPostgresParallelSource(
1,
scanStartupMode,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart());
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Incremental snapshot for tables requires primary key, but table %s doesn't have primary key",
SCHEMA_NAME + ".customers_no_pk"))
.isPresent());
}
} else {
testPostgresParallelSource(
1,
DEFAULT_SCAN_STARTUP_MODE,
scanStartupMode,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart());
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Incremental snapshot for tables requires primary key, but table %s doesn't have primary key",
SCHEMA_NAME + ".customers_no_pk"))
.isPresent());
}
}
@Test
public void testDebeziumSlotDropOnStop() throws Exception {
String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE;
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(2);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s', "
+ " 'debezium.slot.drop.on.stop' = 'true'"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
SCHEMA_NAME,
"customers",
scanStartupMode,
getSlotName());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(
String slotName = getSlotName();
try {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(2);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s', "
+ " 'debezium.slot.drop.on.stop' = 'true'"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
SCHEMA_NAME,
"customers",
scanStartupMode,
slotName);
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(
tableResult,
FailoverType.JM,
FailoverPhase.STREAM,
new String[] {"customers"});
}
// second step: check the stream data
checkStreamDataWithDDLDuringFailover(
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
}
// second step: check the stream data
checkStreamDataWithDDLDuringFailover(
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
tableResult.getJobClient().get().cancel().get();
tableResult.getJobClient().get().cancel().get();
// sleep 1000ms to wait until connections are closed.
Thread.sleep(1000L);
} finally {
customDatabase.removeSlot(slotName);
}
}
private void testPostgresParallelSource(
@ -271,7 +304,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
throws Exception {
testPostgresParallelSource(
parallelism,
DEFAULT_SCAN_STARTUP_MODE,
scanStartupMode,
failoverType,
failoverPhase,
captureCustomerTables,
@ -286,56 +319,64 @@ public class PostgresSourceITCase extends PostgresTestBase {
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(restartStrategyConfiguration);
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s'"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
SCHEMA_NAME,
getTableNameRegex(captureCustomerTables),
scanStartupMode,
getSlotName());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
String slotName = getSlotName();
try {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(restartStrategyConfiguration);
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s'"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
SCHEMA_NAME,
getTableNameRegex(captureCustomerTables),
scanStartupMode,
slotName);
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
}
// second step: check the stream data
checkStreamData(tableResult, failoverType, failoverPhase, captureCustomerTables);
tableResult.getJobClient().get().cancel().get();
// sleep 1000ms to wait until connections are closed.
Thread.sleep(1000L);
} finally {
customDatabase.removeSlot(slotName);
}
// second step: check the stream data
checkStreamData(tableResult, failoverType, failoverPhase, captureCustomerTables);
tableResult.getJobClient().get().cancel().get();
}
private void checkSnapshotData(
@ -448,6 +489,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
// wait for the stream reading
Thread.sleep(2000L);
// update database during stream fail over period
if (failoverPhase == FailoverPhase.STREAM) {
triggerFailover(
failoverType,

@ -154,6 +154,19 @@ public class UniqueDatabase {
}
}
/**
* Drop slot from database.
*
* @param slotName
*/
public void removeSlot(String slotName) throws SQLException {
String sql = String.format("SELECT pg_drop_replication_slot('%s')", slotName);
try (Connection connection = PostgresTestBase.getJdbcConnection(container, databaseName);
Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}
private String convertSQL(final String sql) {
return sql.replace("$DBNAME$", schemaName);
}

@ -108,3 +108,4 @@ VALUES (101,'user_1','Shanghai','123567891234'),
(1018,'user_19','Shanghai','123567891234'),
(1019,'user_20','Shanghai','123567891234'),
(2000,'user_21','Shanghai','123567891234');
ALTER TABLE customers_no_pk REPLICA IDENTITY FULL;
Loading…
Cancel
Save