From 7da3eaef77dc5fa60c691cea183fbbf015ad4215 Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Fri, 10 Nov 2023 17:50:08 +0800 Subject: [PATCH] [postgres-cdc] Add tests for latest-offset startup strategy (#2527) --- .../postgres/source/PostgresSourceITCase.java | 268 ++++++++++-------- .../postgres/testutils/UniqueDatabase.java | 13 + .../src/test/resources/ddl/customer.sql | 1 + 3 files changed, 169 insertions(+), 113 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java index be1ad5fb0..84ac4ead6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -38,6 +38,8 @@ import org.apache.commons.lang3.StringUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.sql.SQLException; import java.util.ArrayList; @@ -62,6 +64,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */ +@RunWith(Parameterized.class) public class PostgresSourceITCase extends PostgresTestBase { private static final String DEFAULT_SCAN_STARTUP_MODE = "initial"; @@ -71,6 +74,8 @@ public class PostgresSourceITCase extends PostgresTestBase { private static final String DB_NAME_PREFIX = "postgres"; private static final String SCHEMA_NAME = "customer"; + private final String scanStartupMode; + @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); @Rule @@ -101,6 +106,15 @@ public class PostgresSourceITCase extends PostgresTestBase { "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]"); + public PostgresSourceITCase(String scanStartupMode) { + this.scanStartupMode = scanStartupMode; + } + + @Parameterized.Parameters(name = "scanStartupMode: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {"initial"}, new Object[] {"latest-offset"}}; + } + /** Second part stream events, which is made by {@link #makeSecondPartStreamEvents}. */ private final List secondPartStreamEvents = Arrays.asList( @@ -178,82 +192,101 @@ public class PostgresSourceITCase extends PostgresTestBase { } @Test - public void testConsumingTableWithoutPrimaryKey() { - try { + public void testConsumingTableWithoutPrimaryKey() throws Exception { + if (scanStartupMode == DEFAULT_SCAN_STARTUP_MODE) { + try { + testPostgresParallelSource( + 1, + scanStartupMode, + FailoverType.NONE, + FailoverPhase.NEVER, + new String[] {"customers_no_pk"}, + RestartStrategies.noRestart()); + } catch (Exception e) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + e, + String.format( + "Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", + SCHEMA_NAME + ".customers_no_pk")) + .isPresent()); + } + } else { testPostgresParallelSource( 1, - DEFAULT_SCAN_STARTUP_MODE, + scanStartupMode, FailoverType.NONE, FailoverPhase.NEVER, new String[] {"customers_no_pk"}, RestartStrategies.noRestart()); - } catch (Exception e) { - assertTrue( - ExceptionUtils.findThrowableWithMessage( - e, - String.format( - "Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", - SCHEMA_NAME + ".customers_no_pk")) - .isPresent()); } } @Test public void testDebeziumSlotDropOnStop() throws Exception { - String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE; - customDatabase.createAndInitialize(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - env.setParallelism(2); - env.enableCheckpointing(200L); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); - String sourceDDL = - format( - "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," - + " address STRING," - + " phone_number STRING," - + " primary key (id) not enforced" - + ") WITH (" - + " 'connector' = 'postgres-cdc'," - + " 'scan.incremental.snapshot.enabled' = 'true'," - + " 'hostname' = '%s'," - + " 'port' = '%s'," - + " 'username' = '%s'," - + " 'password' = '%s'," - + " 'database-name' = '%s'," - + " 'schema-name' = '%s'," - + " 'table-name' = '%s'," - + " 'scan.startup.mode' = '%s'," - + " 'scan.incremental.snapshot.chunk.size' = '100'," - + " 'slot.name' = '%s', " - + " 'debezium.slot.drop.on.stop' = 'true'" - + ")", - customDatabase.getHost(), - customDatabase.getDatabasePort(), - customDatabase.getUsername(), - customDatabase.getPassword(), - customDatabase.getDatabaseName(), - SCHEMA_NAME, - "customers", - scanStartupMode, - getSlotName()); - tEnv.executeSql(sourceDDL); - TableResult tableResult = tEnv.executeSql("select * from customers"); - - // first step: check the snapshot data - if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { - checkSnapshotData( + String slotName = getSlotName(); + try { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(2); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + String sourceDDL = + format( + "CREATE TABLE customers (" + + " id BIGINT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'slot.name' = '%s', " + + " 'debezium.slot.drop.on.stop' = 'true'" + + ")", + customDatabase.getHost(), + customDatabase.getDatabasePort(), + customDatabase.getUsername(), + customDatabase.getPassword(), + customDatabase.getDatabaseName(), + SCHEMA_NAME, + "customers", + scanStartupMode, + slotName); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); + + // first step: check the snapshot data + if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { + checkSnapshotData( + tableResult, + FailoverType.JM, + FailoverPhase.STREAM, + new String[] {"customers"}); + } + + // second step: check the stream data + checkStreamDataWithDDLDuringFailover( tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"}); - } - - // second step: check the stream data - checkStreamDataWithDDLDuringFailover( - tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"}); - tableResult.getJobClient().get().cancel().get(); + tableResult.getJobClient().get().cancel().get(); + // sleep 1000ms to wait until connections are closed. + Thread.sleep(1000L); + } finally { + customDatabase.removeSlot(slotName); + } } private void testPostgresParallelSource( @@ -271,7 +304,7 @@ public class PostgresSourceITCase extends PostgresTestBase { throws Exception { testPostgresParallelSource( parallelism, - DEFAULT_SCAN_STARTUP_MODE, + scanStartupMode, failoverType, failoverPhase, captureCustomerTables, @@ -286,56 +319,64 @@ public class PostgresSourceITCase extends PostgresTestBase { String[] captureCustomerTables, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) throws Exception { - customDatabase.createAndInitialize(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - env.setParallelism(parallelism); - env.enableCheckpointing(200L); - env.setRestartStrategy(restartStrategyConfiguration); - String sourceDDL = - format( - "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," - + " address STRING," - + " phone_number STRING," - + " primary key (id) not enforced" - + ") WITH (" - + " 'connector' = 'postgres-cdc'," - + " 'scan.incremental.snapshot.enabled' = 'true'," - + " 'hostname' = '%s'," - + " 'port' = '%s'," - + " 'username' = '%s'," - + " 'password' = '%s'," - + " 'database-name' = '%s'," - + " 'schema-name' = '%s'," - + " 'table-name' = '%s'," - + " 'scan.startup.mode' = '%s'," - + " 'scan.incremental.snapshot.chunk.size' = '100'," - + " 'slot.name' = '%s'" - + ")", - customDatabase.getHost(), - customDatabase.getDatabasePort(), - customDatabase.getUsername(), - customDatabase.getPassword(), - customDatabase.getDatabaseName(), - SCHEMA_NAME, - getTableNameRegex(captureCustomerTables), - scanStartupMode, - getSlotName()); - tEnv.executeSql(sourceDDL); - TableResult tableResult = tEnv.executeSql("select * from customers"); - - // first step: check the snapshot data - if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { - checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables); + String slotName = getSlotName(); + try { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(restartStrategyConfiguration); + String sourceDDL = + format( + "CREATE TABLE customers (" + + " id BIGINT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'slot.name' = '%s'" + + ")", + customDatabase.getHost(), + customDatabase.getDatabasePort(), + customDatabase.getUsername(), + customDatabase.getPassword(), + customDatabase.getDatabaseName(), + SCHEMA_NAME, + getTableNameRegex(captureCustomerTables), + scanStartupMode, + slotName); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); + + // first step: check the snapshot data + if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { + checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables); + } + + // second step: check the stream data + checkStreamData(tableResult, failoverType, failoverPhase, captureCustomerTables); + + tableResult.getJobClient().get().cancel().get(); + + // sleep 1000ms to wait until connections are closed. + Thread.sleep(1000L); + } finally { + customDatabase.removeSlot(slotName); } - - // second step: check the stream data - checkStreamData(tableResult, failoverType, failoverPhase, captureCustomerTables); - - tableResult.getJobClient().get().cancel().get(); } private void checkSnapshotData( @@ -448,6 +489,7 @@ public class PostgresSourceITCase extends PostgresTestBase { // wait for the stream reading Thread.sleep(2000L); + // update database during stream fail over period if (failoverPhase == FailoverPhase.STREAM) { triggerFailover( failoverType, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java index 9b83f05c8..5c6de1abe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java @@ -154,6 +154,19 @@ public class UniqueDatabase { } } + /** + * Drop slot from database. + * + * @param slotName + */ + public void removeSlot(String slotName) throws SQLException { + String sql = String.format("SELECT pg_drop_replication_slot('%s')", slotName); + try (Connection connection = PostgresTestBase.getJdbcConnection(container, databaseName); + Statement statement = connection.createStatement()) { + statement.execute(sql); + } + } + private String convertSQL(final String sql) { return sql.replace("$DBNAME$", schemaName); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql index c9602509c..5c83b8e18 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql @@ -108,3 +108,4 @@ VALUES (101,'user_1','Shanghai','123567891234'), (1018,'user_19','Shanghai','123567891234'), (1019,'user_20','Shanghai','123567891234'), (2000,'user_21','Shanghai','123567891234'); +ALTER TABLE customers_no_pk REPLICA IDENTITY FULL; \ No newline at end of file