From 2c557c6a780108b16c33d88fb127fd578ced44ad Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Tue, 19 Dec 2023 11:14:44 +0800 Subject: [PATCH] [cdc-connector][sqlserver][tests] Fix UT errors by correcting right output (#2864) --- .../source/MongoDBFullChangelogITCase.java | 4 +- .../source/MongoDBParallelSourceITCase.java | 9 +--- .../mysql/source/MySqlSourceITCase.java | 5 -- .../oracle/source/OracleSourceITCase.java | 7 +-- .../postgres/source/PostgresSourceITCase.java | 7 +-- .../source/SqlServerSourceITCase.java | 46 +++++++++++-------- 6 files changed, 35 insertions(+), 43 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 86f936d6d..7b5b8bff1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -398,7 +398,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { env.enableCheckpointing(1000); env.setParallelism(1); - ResolvedSchema customersSchame = + ResolvedSchema customersSchema = new ResolvedSchema( Arrays.asList( physical("cid", BIGINT().notNull()), @@ -407,7 +407,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { physical("phone_number", STRING())), new ArrayList<>(), UniqueConstraint.primaryKey("pk", Collections.singletonList("cid"))); - TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame); + TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema); MongoDBSource source = new MongoDBSourceBuilder() .hosts(CONTAINER.getHostAndPort()) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java index 9ac86fbe4..8ea63d551 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java @@ -302,7 +302,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { env.enableCheckpointing(1000); env.setParallelism(1); - ResolvedSchema customersSchame = + ResolvedSchema customersSchema = new ResolvedSchema( Arrays.asList( physical("cid", BIGINT().notNull()), @@ -311,7 +311,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { physical("phone_number", STRING())), new ArrayList<>(), UniqueConstraint.primaryKey("pk", Collections.singletonList("cid"))); - TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchame); + TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema); MongoDBSource source = new MongoDBSourceBuilder() .hosts(CONTAINER.getHostAndPort()) @@ -345,11 +345,6 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { mongoCollection.updateOne( Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh")); mongoCollection.deleteOne(Filters.eq("cid", 1019L)); - try { - Thread.sleep(500L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } }; if (hookType == USE_POST_LOWWATERMARK_HOOK) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index 2ca4038d2..50bd27978 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -528,11 +528,6 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { connection.setAutoCommit(false); connection.execute(statements); connection.commit(); - try { - Thread.sleep(500L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } }; if (hookType == USE_PRE_HIGHWATERMARK_HOOK) { hooks.setPreHighWatermarkAction(snapshotPhaseHook); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java index ce1589321..c31b46342 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -275,7 +275,7 @@ public class OracleSourceITCase extends OracleSourceTestBase { env.enableCheckpointing(200L); env.setParallelism(1); - ResolvedSchema customersSchame = + ResolvedSchema customersSchema = new ResolvedSchema( Arrays.asList( physical("ID", BIGINT().notNull()), @@ -285,7 +285,7 @@ public class OracleSourceITCase extends OracleSourceTestBase { new ArrayList<>(), UniqueConstraint.primaryKey("pk", Collections.singletonList("ID"))); TestTable customerTable = - new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchame); + new TestTable(ORACLE_DATABASE, ORACLE_SCHEMA, "CUSTOMERS", customersSchema); String tableId = customerTable.getTableId(); OracleSourceBuilder.OracleIncrementalSource source = @@ -326,9 +326,6 @@ public class OracleSourceITCase extends OracleSourceTestBase { try (OracleConnection oracleConnection = OracleConnectionUtils.createOracleConnection(configuration)) { oracleConnection.execute(statements); - Thread.sleep(500L); - } catch (InterruptedException e) { - throw new RuntimeException(e); } }; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java index f4c364978..f5ce180e2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -482,7 +482,7 @@ public class PostgresSourceITCase extends PostgresTestBase { env.enableCheckpointing(1000); env.setParallelism(1); - ResolvedSchema customersSchame = + ResolvedSchema customersSchema = new ResolvedSchema( Arrays.asList( physical("id", BIGINT().notNull()), @@ -492,7 +492,7 @@ public class PostgresSourceITCase extends PostgresTestBase { new ArrayList<>(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); TestTable customerTable = - new TestTable(customDatabase, "customer", "customers", customersSchame); + new TestTable(customDatabase, "customer", "customers", customersSchema); String tableId = customerTable.getTableId(); PostgresSourceBuilder.PostgresIncrementalSource source = @@ -525,9 +525,6 @@ public class PostgresSourceITCase extends PostgresTestBase { try (PostgresConnection postgresConnection = dialect.openJdbcConnection()) { postgresConnection.execute(statements); postgresConnection.commit(); - Thread.sleep(500L); - } catch (InterruptedException e) { - throw new RuntimeException(e); } }; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java index c563199ab..9e2a55e37 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java @@ -123,7 +123,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { @Test public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK); + List records = testBackfillWhenWritingEvents(false, 25, USE_PRE_HIGHWATERMARK_HOOK); List expectedRecords = Arrays.asList( @@ -146,17 +146,23 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", - "+I[2000, user_21, Pittsburgh, 123567891234]", - "+I[15213, user_15213, Shanghai, 123567891234]"); - // when enable backfill, the wal log between [snapshot, high_watermark) will be - // applied as snapshot image + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]", + "+I[15213, user_15213, Shanghai, 123567891234]", + "-U[2000, user_21, Shanghai, 123567891234]", + "+U[2000, user_21, Pittsburgh, 123567891234]", + "-D[1019, user_20, Shanghai, 123567891234]"); + // In sqlserver database, because the capture process extracts change data from the + // transaction log, there is a built-in latency between the time that a change is committed + // to a source table and the time that the change appears within its associated change + // table.Then in streaming phase, the log which should be ignored will be read again. assertEqualsInAnyOrder(expectedRecords, records); } @Test public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK); + List records = testBackfillWhenWritingEvents(false, 25, USE_POST_LOWWATERMARK_HOOK); List expectedRecords = Arrays.asList( @@ -180,9 +186,15 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", - "+I[15213, user_15213, Shanghai, 123567891234]"); - // when enable backfill, the wal log between [low_watermark, snapshot) will be applied - // as snapshot image + "+I[15213, user_15213, Shanghai, 123567891234]", + "+I[15213, user_15213, Shanghai, 123567891234]", + "-U[2000, user_21, Shanghai, 123567891234]", + "+U[2000, user_21, Pittsburgh, 123567891234]", + "-D[1019, user_20, Shanghai, 123567891234]"); + // In sqlserver database, because the capture process extracts change data from the + // transaction log, there is a built-in latency between the time that a change is committed + // to a source table and the time that the change appears within its associated change + // table.Then in streaming phase, the log which should be ignored will be read again. assertEqualsInAnyOrder(expectedRecords, records); } @@ -272,7 +284,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { env.enableCheckpointing(1000); env.setParallelism(1); - ResolvedSchema customersSchame = + ResolvedSchema customersSchema = new ResolvedSchema( Arrays.asList( physical("id", BIGINT().notNull()), @@ -281,7 +293,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { physical("phone_number", STRING())), new ArrayList<>(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); - TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchame); + TestTable customerTable = new TestTable(databaseName, "dbo", "customers", customersSchema); String tableId = customerTable.getTableId(); SqlServerSourceBuilder.SqlServerIncrementalSource source = @@ -310,14 +322,10 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { (sourceConfig, split) -> { SqlServerDialect dialect = new SqlServerDialect((SqlServerSourceConfig) sourceConfig); - JdbcConnection sqlServerConnection = - dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig); - sqlServerConnection.execute(statements); - sqlServerConnection.commit(); - try { - Thread.sleep(5000L); - } catch (InterruptedException e) { - throw new RuntimeException(e); + try (JdbcConnection sqlServerConnection = + dialect.openJdbcConnection((JdbcSourceConfig) sourceConfig)) { + sqlServerConnection.execute(statements); + sqlServerConnection.commit(); } };