diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java index 27d30b26b..a74bf035e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java @@ -104,10 +104,24 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { + String.format(" 'rootserver-list' = '%s'", METADATA.getRsList()); } + /** + * Current OceanBase connector uses timestamp (in seconds) to mark the offset during the + * transition from {@code SNAPSHOT} to {@code STREAMING} mode. Thus, if some snapshot inserting + * events are too close to the transitioning offset, snapshot inserting events might be emitted + * multiple times.
+ * This could be safely removed after switching to incremental snapshot framework which provides + * Exactly-once guarantee. + */ + private void waitForTableInitialization() throws InterruptedException { + Thread.sleep(5000L); + } + @Test public void testTableList() throws Exception { inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory"); inventoryDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); + String sourceDDL = String.format( "CREATE TABLE ob_source (" @@ -212,6 +226,8 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { public void testMetadataColumns() throws Exception { inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory"); inventoryDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); + String sourceDDL = String.format( "CREATE TABLE ob_source (" @@ -297,6 +313,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test"); columnTypesDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); String sourceDDL = String.format( @@ -488,6 +505,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test"); columnTypesDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); String sourceDDL = String.format( @@ -559,6 +577,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { public void testSnapshotOnly() throws Exception { inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory"); inventoryDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); String sourceDDL = String.format( @@ -611,7 +630,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { while (result.getJobClient().get().getJobStatus().get().equals(JobStatus.RUNNING)) { Thread.sleep(100); - // Waiting for job to quit, in case if + // Waiting for job to finish (SNAPSHOT job will end spontaneously) } } }