From 856c664014344974379ad1946049ffed7d3d1d4a Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 23 Jan 2025 12:41:40 +0800 Subject: [PATCH] tests: fetch table results with timeout set properly --- .../flink/cdc/common/utils/TestCaseUtils.java | 82 +++++++++++++++++++ .../db2/source/Db2SourceITCase.java | 18 ++-- .../source/MongoDBFullChangelogITCase.java | 11 +-- .../source/MongoDBParallelSourceITCase.java | 11 +-- .../mongodb/table/MongoDBTimeZoneITCase.java | 18 ++-- .../mongodb/utils/MongoDBTestUtils.java | 28 ------- .../mysql/LegacyMySqlSourceITCase.java | 21 +---- .../MysqlDebeziumTimeConverterITCase.java | 17 +--- ...ySqlOnLineSchemaMigrationSourceITCase.java | 13 --- .../mysql/source/MySqlSourceITCase.java | 52 +++--------- .../mysql/source/NewlyAddedTableITCase.java | 17 +--- .../source/SpecificStartingOffsetITCase.java | 28 ++----- .../mysql/table/MySqlCompatibilityITCase.java | 20 ++--- .../mysql/table/MySqlConnectorITCase.java | 57 +++++++------ .../MySqlConnectorShardingTableITCase.java | 17 +--- .../table/MySqlJsonArrayAsKeyIndexITCase.java | 19 +---- ...MySqlOnLineSchemaMigrationTableITCase.java | 45 +++++----- .../mysql/table/MySqlTimezoneITCase.java | 19 ++--- .../table/MysqlConnectorCharsetITCase.java | 21 ++--- .../polardbx/PolardbxCharsetITCase.java | 8 +- .../polardbx/PolardbxSourceITCase.java | 13 +-- .../polardbx/PolardbxSourceTestBase.java | 13 --- .../oracle/source/OracleSourceITCase.java | 35 ++------ .../connectors/postgres/PostgresTestBase.java | 13 --- .../postgres/source/PostgresSourceITCase.java | 43 ++++------ .../table/PostgreSQLConnectorITCase.java | 3 +- .../source/SqlServerSourceITCase.java | 34 ++------ .../vitess/table/VitessConnectorITCase.java | 15 +--- 28 files changed, 263 insertions(+), 428 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java index f179e779c..5f099ae60 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java @@ -19,11 +19,19 @@ package org.apache.flink.cdc.common.utils; import org.apache.flink.util.function.SupplierWithException; +import javax.annotation.Nullable; + import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; /** Some utility methods for creating repeated-checking test cases. */ public class TestCaseUtils { @@ -91,4 +99,78 @@ public class TestCaseUtils { } throw new RuntimeException("Timeout when waiting for state to be ready."); } + + /** + * Fetches at most {@code size} entries from {@link Iterator} {@code iter} within {@code + * DEFAULT_TIMEOUT}. + */ + public static List fetch(Iterator iter, final int size) throws InterruptedException { + return fetch(iter, size, DEFAULT_TIMEOUT); + } + + /** + * Fetches at most {@code size} entries from {@link Iterator} {@code iter}.
+ * It may return a list with less than {@code size} elements, if {@code iter} doesn't provide + * results or {@code timeout} exceeds. + */ + public static List fetch(Iterator iter, final int size, @Nullable Duration timeout) + throws InterruptedException { + long deadline = Long.MAX_VALUE; + if (timeout != null) { + deadline = System.currentTimeMillis() + timeout.toMillis(); + } + + ConcurrentLinkedQueue results = new ConcurrentLinkedQueue<>(); + AtomicReference fetchException = new AtomicReference<>(); + + Thread thread = + new Thread( + () -> { + try { + int remainingSize = size; + while (remainingSize > 0 && iter.hasNext()) { + T row = iter.next(); + results.add(row); + remainingSize--; + } + } catch (Throwable t) { + fetchException.set(t); + } + }); + + thread.start(); + + while (true) { + // Raise any exception thrown by the fetching thread + if (fetchException.get() != null) { + throw (RuntimeException) fetchException.get(); + } + + // Stop if fetching thread has exited + if (!thread.isAlive()) { + break; + } + + // Stop waiting if deadline has arrived + if (System.currentTimeMillis() > deadline) { + thread.interrupt(); + break; + } + + Thread.sleep(1000L); + } + + return new ArrayList<>(results); + } + + public static List fetchAndConvert( + Iterator iter, int size, Function converter) throws InterruptedException { + return fetch(iter, size).stream().map(converter).collect(Collectors.toList()); + } + + public static List fetchAndConvert( + Iterator iter, int size, Duration timeout, Function converter) + throws InterruptedException { + return fetch(iter, size, timeout).stream().map(converter).collect(Collectors.toList()); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java index c3221aed8..8fb34659b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java @@ -38,10 +38,10 @@ import org.junit.rules.Timeout; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import static java.lang.String.format; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.testcontainers.containers.Db2Container.DB2_PORT; /** IT tests for {@link Db2IncrementalSource}. */ @@ -217,7 +217,8 @@ public class Db2SourceITCase extends Db2TestBase { } assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString)); // second step: check the change stream data for (String tableId : captureCustomerTables) { @@ -250,7 +251,8 @@ public class Db2SourceITCase extends Db2TestBase { expectedRedoLogsData.addAll(Arrays.asList(redoLogsForSingleTable)); } assertEqualsInAnyOrder( - expectedRedoLogsData, fetchRows(iterator, expectedRedoLogsData.size())); + expectedRedoLogsData, + fetchAndConvert(iterator, expectedRedoLogsData.size(), Row::toString)); tableResult.getJobClient().get().cancel().get(); } @@ -275,16 +277,6 @@ public class Db2SourceITCase extends Db2TestBase { } } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - /** The type of failover. */ protected enum FailoverType { TM, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index eebaacafc..d3f5ddbc3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -56,11 +56,10 @@ import java.util.List; import java.util.Random; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; -import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRowData; -import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRows; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.triggerFailover; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; @@ -584,7 +583,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { try (CloseableIterator iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - records = fetchRowData(iterator, fetchSize, customerTable::stringify); + records = fetchAndConvert(iterator, fetchSize, customerTable::stringify); env.close(); } return records; @@ -713,7 +712,8 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { } assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString)); // second step: check the change stream data for (String collectionName : captureCustomerCollections) { @@ -747,7 +747,8 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { for (int i = 0; i < captureCustomerCollections.length; i++) { expectedChangeStreamData.addAll(Arrays.asList(changeEventsForSingleTable)); } - List actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size()); + List actualChangeStreamData = + fetchAndConvert(iterator, expectedChangeStreamData.size(), Row::toString); assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData); tableResult.getJobClient().get().cancel().get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java index 461bf76af..467d07b25 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java @@ -56,11 +56,10 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; -import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRowData; -import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRows; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.triggerFailover; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; @@ -496,7 +495,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { try (CloseableIterator iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - records = fetchRowData(iterator, fetchSize, customerTable::stringify); + records = fetchAndConvert(iterator, fetchSize, customerTable::stringify); env.close(); } return records; @@ -605,7 +604,8 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { } assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString)); // second step: check the change stream data for (String collectionName : captureCustomerCollections) { @@ -639,7 +639,8 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { for (int i = 0; i < captureCustomerCollections.length; i++) { expectedChangeStreamData.addAll(Arrays.asList(changeEventsForSingleTable)); } - List actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size()); + List actualChangeStreamData = + fetchAndConvert(iterator, expectedChangeStreamData.size(), Row::toString); assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData); tableResult.getJobClient().get().cancel().get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java index 96ab145a9..a3f49ad27 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mongodb.table; +import org.apache.flink.cdc.common.utils.TestCaseUtils; import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -33,7 +34,6 @@ import org.junit.runners.Parameterized; import java.time.ZoneId; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; @@ -154,7 +154,8 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase { break; } - List actualSnapshot = fetchRows(iterator, expectedSnapshot.length); + List actualSnapshot = + TestCaseUtils.fetchAndConvert(iterator, expectedSnapshot.length, Row::toString); assertThat(actualSnapshot, containsInAnyOrder(expectedSnapshot)); result.getJobClient().get().cancel().get(); @@ -217,19 +218,10 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase { break; } - List actualSnapshot = fetchRows(iterator, expectedSnapshot.length); + List actualSnapshot = + TestCaseUtils.fetchAndConvert(iterator, expectedSnapshot.length, Row::toString); assertThat(actualSnapshot, containsInAnyOrder(expectedSnapshot)); result.getJobClient().get().cancel().get(); } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java index 7b826dd71..b1fa792a7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.java @@ -20,16 +20,9 @@ package org.apache.flink.cdc.connectors.mongodb.utils; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.types.Row; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.junit.Assert.fail; @@ -74,27 +67,6 @@ public class MongoDBTestUtils { } } - public static List fetchRowData( - Iterator iter, int size, Function stringifier) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return rows.stream().map(stringifier).collect(Collectors.toList()); - } - - public static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - /** The type of failover. */ public enum FailoverType { TM, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java index de7acd86a..434b5ec24 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java @@ -39,13 +39,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetch; import static org.junit.Assert.assertTrue; /** Integration tests for the legacy {@link MySqlSource}. */ @@ -112,7 +110,7 @@ public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase { waitForSnapshotStarted(snapshot); assertTrue( dataInJsonIsEquals( - fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString())); + fetch(snapshot, 1).get(0).toString(), expectSnapshot.toString())); try (Connection connection = fullTypesDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute( @@ -122,9 +120,7 @@ public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase { // check the binlog result CloseableIterator binlog = result.collect(); JSONObject expectBinlog = expected.getJSONObject("expected_binlog"); - assertTrue( - dataInJsonIsEquals( - fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString())); + assertTrue(dataInJsonIsEquals(fetch(binlog, 1).get(0).toString(), expectBinlog.toString())); result.getJobClient().get().cancel().get(); } @@ -136,17 +132,6 @@ public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase { testConsumingAllEventsWithJsonFormat(includeSchema, null, expectedFile); } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - // ignore rowKind marker - rows.add(row.getField(0)); - size--; - } - return rows; - } - private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { while (!iterator.hasNext()) { Thread.sleep(100); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java index 8adb92454..f562cc272 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mysql.debezium.converters; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.common.utils.TestCaseUtils; import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder; @@ -60,7 +61,6 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Properties; @@ -227,7 +227,7 @@ public class MysqlDebeziumTimeConverterITCase { return debeziumProperties; } - private void checkData(TableResult tableResult) { + private void checkData(TableResult tableResult) throws Exception { String[] snapshotForSingleTable = new String[] { "+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]", @@ -240,17 +240,8 @@ public class MysqlDebeziumTimeConverterITCase { CloseableIterator collect = tableResult.collect(); tableResult.getJobClient().get().getJobID(); assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(collect, expectedSnapshotData.size())); - } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; + expectedSnapshotData, + TestCaseUtils.fetchAndConvert(collect, expectedSnapshotData.size(), Row::toString)); } protected MySqlContainer createMySqlContainer(String timezone) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java index 848869783..b07a2baca 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java @@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.types.Row; import org.junit.After; import org.junit.AfterClass; @@ -54,9 +53,7 @@ import java.io.IOException; import java.io.PrintStream; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; @@ -571,14 +568,4 @@ public class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase } } } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index 7be090a8f..95340fef2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -84,7 +84,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -97,12 +96,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -347,7 +347,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { // Check all snapshot records are sent with exactly-once semantics assertEqualsInAnyOrder( Arrays.asList(expectedSnapshotData), - fetchRowData(iterator, expectedSnapshotData.length)); + fetchAndConvert(iterator, expectedSnapshotData.length, RowData::toString)); assertTrue(!hasNextData(iterator)); jobClient.cancel().get(); } @@ -638,7 +638,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { try (CloseableIterator iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - List records = fetchRowData(iterator, fetchSize, customerTable::stringify); + List records = fetchAndConvert(iterator, fetchSize, customerTable::stringify); return records; } } @@ -693,7 +693,9 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { DataStreamSource source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); try (CloseableIterator iterator = source.executeAndCollect()) { - List rows = fetchRowData(iterator, expectedChangelogAfterStart.size()); + List rows = + fetchAndConvert( + iterator, expectedChangelogAfterStart.size(), RowData::toString); assertEqualsInAnyOrder(expectedChangelogAfterStart, rows); } } @@ -1027,7 +1029,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { } assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString)); } private void checkBinlogData( @@ -1064,8 +1067,10 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { expectedBinlogData.addAll(secondPartBinlogEvents); } - assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size())); - assertTrue(!hasNextData(iterator)); + assertEqualsInAnyOrder( + expectedBinlogData, + fetchAndConvert(iterator, expectedBinlogData.size(), Row::toString)); + assertThat(iterator.hasNext()).isFalse(); } private static List convertRowDataToRowString(List rows) { @@ -1090,37 +1095,6 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { .collect(Collectors.toList()); } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - - private List fetchRowData( - Iterator iter, int size, Function stringifier) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return rows.stream().map(stringifier).collect(Collectors.toList()); - } - - private static List fetchRowData(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return convertRowDataToRowString(rows); - } - private String getTableNameRegex(String[] captureCustomerTables) { checkState(captureCustomerTables.length > 0); if (captureCustomerTables.length == 1) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java index 1f3fa0d1a..0c7011f4f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java @@ -62,7 +62,6 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -79,6 +78,7 @@ import java.util.stream.Stream; import static java.lang.String.format; import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.util.Preconditions.checkState; /** IT tests to cover various newly added tables during capture process. */ @@ -487,7 +487,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { expectedCustomersResult.stream()) .collect(Collectors.toList()) : expectedCustomersResult; - List rows = fetchRowData(iterator, expectedSnapshotResult.size()); + List rows = + fetchAndConvert(iterator, expectedSnapshotResult.size(), RowData::toString); assertEqualsInAnyOrder(expectedSnapshotResult, rows); // make binlog events @@ -503,7 +504,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { "UPDATE " + tableId + " SET address = 'Update2' where id = 103"); connection.commit(); } - rows = fetchRowData(iterator, expectedBinlogResult.size()); + rows = fetchAndConvert(iterator, expectedBinlogResult.size(), RowData::toString); assertEqualsInAnyOrder(expectedBinlogResult, rows); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); @@ -540,16 +541,6 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { return iterator; } - private List fetchRowData(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return convertRowDataToRowString(rows); - } - private static List convertRowDataToRowString(List rows) { LinkedHashMap map = new LinkedHashMap<>(); map.put("id", 0); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java index 7a178842e..bfe8c473c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java @@ -62,18 +62,15 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.time.ZoneId; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.assertj.core.api.Assertions.assertThat; /** Integration test for validating specifying starting offset. */ @@ -150,7 +147,7 @@ public class SpecificStartingOffsetITCase { // Execute job and validate results JobClient jobClient = env.executeAsync(); iterator.setJobClient(jobClient); - List rows = fetchRowData(iterator, 3, customers::stringify); + List rows = fetchAndConvert(iterator, 3, customers::stringify); assertThat(rows) .containsExactly( "+I[15213, Alice, Rome, 123456987]", @@ -176,7 +173,7 @@ public class SpecificStartingOffsetITCase { setupSavepoint(restoredEnv, savepointPath); JobClient restoredJobClient = restoredEnv.executeAsync(); iterator.setJobClient(restoredJobClient); - List rowsAfterRestored = fetchRowData(iterator, 2, customers::stringify); + List rowsAfterRestored = fetchAndConvert(iterator, 2, customers::stringify); assertThat(rowsAfterRestored) .containsExactly( "-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]"); @@ -225,7 +222,7 @@ public class SpecificStartingOffsetITCase { // Execute job and validate results JobClient jobClient = env.executeAsync(); iterator.setJobClient(jobClient); - List rows = fetchRowData(iterator, 3, customers::stringify); + List rows = fetchAndConvert(iterator, 3, customers::stringify); assertThat(rows) .containsExactly( "+I[15213, Alice, Rome, 123456987]", @@ -251,7 +248,7 @@ public class SpecificStartingOffsetITCase { setupSavepoint(restoredEnv, savepointPath); JobClient restoredJobClient = restoredEnv.executeAsync("snapshotSplitTest"); iterator.setJobClient(restoredJobClient); - List rowsAfterRestored = fetchRowData(iterator, 2, customers::stringify); + List rowsAfterRestored = fetchAndConvert(iterator, 2, customers::stringify); assertThat(rowsAfterRestored) .containsExactly( "-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]"); @@ -398,7 +395,7 @@ public class SpecificStartingOffsetITCase { // Execute job and validate results JobClient jobClient = env.executeAsync(); iterator.setJobClient(jobClient); - List rows = fetchRowData(iterator, 3, customers::stringify); + List rows = fetchAndConvert(iterator, 3, customers::stringify); assertThat(rows) .containsExactly( "+I[19613, Tom, NewYork, 123456987]", @@ -424,7 +421,7 @@ public class SpecificStartingOffsetITCase { setupSavepoint(restoredEnv, savepointPath); JobClient restoredJobClient = restoredEnv.executeAsync("snapshotSplitTest"); iterator.setJobClient(restoredJobClient); - List rowsAfterRestored = fetchRowData(iterator, 2, customers::stringify); + List rowsAfterRestored = fetchAndConvert(iterator, 2, customers::stringify); assertThat(rowsAfterRestored) .containsExactly( "-U[18213, Charlie, Paris, 123456987]", @@ -500,17 +497,6 @@ public class SpecificStartingOffsetITCase { return iterator; } - private List fetchRowData( - Iterator iter, int size, Function stringifier) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return rows.stream().map(stringifier).collect(Collectors.toList()); - } - private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) { try { TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java index 23a4b25a8..6cb7f47c7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java @@ -44,16 +44,14 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.stream.Stream; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder; import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder; @@ -185,7 +183,8 @@ public class MySqlCompatibilityITCase { "+I[109, spare tire, 24 inch spare tire, 22.200]" }; assertEqualsInAnyOrder( - Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length)); + Arrays.asList(expectedSnapshot), + fetchAndConvert(iterator, expectedSnapshot.length, Row::toString)); try (Connection connection = testDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -218,7 +217,8 @@ public class MySqlCompatibilityITCase { }; assertEqualsInOrder( - Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length)); + Arrays.asList(expectedBinlog), + fetchAndConvert(iterator, expectedBinlog.length, Row::toString)); result.getJobClient().get().cancel().get(); mySqlContainer.stop(); } @@ -229,16 +229,6 @@ public class MySqlCompatibilityITCase { return serverId + "-" + (serverId + env.getParallelism()); } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private String buildCustomMySqlConfig(MySqlVersion version, boolean enableGtid) { try { File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 799e96ffa..a96e4b26a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -57,7 +57,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Random; @@ -65,6 +64,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMySqlLatestOffset; import static org.apache.flink.cdc.connectors.mysql.MySqlTestUtils.assertContainsErrorMsg; import static org.apache.flink.cdc.connectors.mysql.MySqlTestUtils.waitForJobStatus; @@ -473,7 +473,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[109, spare tire, 24 inch spare tire, 22.200]" }; assertEqualsInAnyOrder( - Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length)); + Arrays.asList(expectedSnapshot), + fetchAndConvert(iterator, expectedSnapshot.length, Row::toString)); try (Connection connection = inventoryDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -499,7 +500,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "-D[111, scooter, Big 2-wheel scooter , 5.170]" }; assertEqualsInOrder( - Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length)); + Arrays.asList(expectedBinlog), + fetchAndConvert(iterator, expectedBinlog.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -760,7 +762,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { + "]", }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -830,7 +833,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+U[0, 1024, " + getIntegerSeqString(2, tableColumnCount) + "]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -1096,7 +1100,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -1174,7 +1179,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 50, 500, flink]", "-D[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -1241,7 +1247,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+U[416927583791428523, China, Hangzhou, West Town address 2]", "+I[418257940021724075, Germany, Berlin, West Town address 3]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -1307,7 +1314,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+U[103, user_3, Hangzhou, 123567891234]", "+I[110, newCustomer, Berlin, 12345678]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); customer3_0Database.dropDatabase(); } @@ -1381,7 +1389,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -1519,7 +1528,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { + " tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 '" + " );"); } - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); jobClient.cancel().get(); } @@ -1588,7 +1598,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { statement.execute( "alter table default_value_test add column `int_test` INT DEFAULT ' 30 ';"); } - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); jobClient.cancel().get(); } @@ -1986,7 +1997,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[123458.6789, KIND_003, user_3, my shopping cart]", "+I[123459.1234, KIND_004, user_4, null]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -2051,7 +2063,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[E, 3, flink]", "+I[e, 4, flink]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -2183,7 +2196,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+I[1, 127, 255, 255, 32767, 65535, 65535, 2023]", "+I[2, 127, 255, 255, 32767, 65535, 65535, 2024]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -2255,16 +2269,6 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { } } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { while (!iterator.hasNext()) { Thread.sleep(100); @@ -2348,7 +2352,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { "+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]", "-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]" }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java index 8ee1e9f28..2c7f17e62 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java @@ -41,13 +41,13 @@ import org.testcontainers.lifecycle.Startables; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.stream.Stream; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; + /** Integration tests for MySQL shardding tables. */ @RunWith(Parameterized.class) public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase { @@ -269,7 +269,8 @@ public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase { "+U[221, user_221, Shanghai, 123567891234, null, 20]", }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -341,16 +342,6 @@ public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase { } } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { while (!iterator.hasNext()) { Thread.sleep(100); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlJsonArrayAsKeyIndexITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlJsonArrayAsKeyIndexITCase.java index 94d654e21..cdbbeed7e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlJsonArrayAsKeyIndexITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlJsonArrayAsKeyIndexITCase.java @@ -39,14 +39,12 @@ import org.testcontainers.lifecycle.Startables; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.List; import java.util.Random; import java.util.stream.Stream; import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; /** Integration tests for MySQL Table source. */ @RunWith(Parameterized.class) @@ -100,7 +98,7 @@ public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase { } @Test - public void testJsonArrayAsKeyIndex() { + public void testJsonArrayAsKeyIndex() throws InterruptedException { UniqueDatabase jaakiDatabase = new UniqueDatabase(container, "json_array_as_key", TEST_USER, TEST_PASSWORD); jaakiDatabase.createAndInitialize(); @@ -161,7 +159,8 @@ public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase { "+I[17]", "+I[18]", "+I[19]", "-D[19]", }; - assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString)); try { result.getJobClient().get().cancel().get(); @@ -170,16 +169,6 @@ public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase { } } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private String getServerId() { final Random random = new Random(); int serverId = random.nextInt(100) + 5400; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java index 92e5fc566..023b6ef49 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java @@ -52,9 +52,7 @@ import org.testcontainers.lifecycle.Startables; import java.io.IOException; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; @@ -64,6 +62,7 @@ import java.util.stream.Stream; import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL; import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; /** * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java index 7d994e800..711d8b4d5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java @@ -45,16 +45,15 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.stream.Stream; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder; import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder; @@ -189,7 +188,8 @@ public class MySqlTimezoneITCase { "+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]" }; assertEqualsInAnyOrder( - Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length)); + Arrays.asList(expectedSnapshot), + fetchAndConvert(iterator, expectedSnapshot.length, Row::toString)); try (Connection connection = fullTypesDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -205,7 +205,8 @@ public class MySqlTimezoneITCase { }; assertEqualsInOrder( - Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length)); + Arrays.asList(expectedBinlog), + fetchAndConvert(iterator, expectedBinlog.length, Row::toString)); result.getJobClient().get().cancel().get(); mySqlContainer.stop(); @@ -228,16 +229,6 @@ public class MySqlTimezoneITCase { return 0; } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private String buildMySqlConfigWithTimezone(String timezone) { try { File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MysqlConnectorCharsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MysqlConnectorCharsetITCase.java index 197452389..a90cda6a2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MysqlConnectorCharsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MysqlConnectorCharsetITCase.java @@ -35,12 +35,11 @@ import org.junit.runners.Parameterized; import java.sql.Connection; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.List; import java.util.Random; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; + /** Test supporting different column charsets for MySQL Table source. */ @RunWith(Parameterized.class) public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase { @@ -379,7 +378,8 @@ public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase { CloseableIterator iterator = result.collect(); waitForSnapshotStarted(iterator); assertEqualsInAnyOrder( - Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length)); + Arrays.asList(snapshotExpected), + fetchAndConvert(iterator, snapshotExpected.length, Row::toString)); // test binlog phase try (Connection connection = charsetTestDatabase.getJdbcConnection(); @@ -387,7 +387,8 @@ public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase { statement.execute(String.format("UPDATE %s SET table_id = table_id + 10;", testName)); } assertEqualsInAnyOrder( - Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length)); + Arrays.asList(binlogExpected), + fetchAndConvert(iterator, binlogExpected.length, Row::toString)); result.getJobClient().get().cancel().get(); } @@ -397,16 +398,6 @@ public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase { return serverId + "-" + (serverId + env.getParallelism()); } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { while (!iterator.hasNext()) { Thread.sleep(100); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java index d4ecb5d97..b755db916 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java @@ -36,6 +36,8 @@ import java.sql.Connection; import java.sql.Statement; import java.util.Arrays; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; + /** Test supporting different column charsets for Polardbx. */ @RunWith(Parameterized.class) public class PolardbxCharsetITCase extends PolardbxSourceTestBase { @@ -185,7 +187,8 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase { CloseableIterator iterator = result.collect(); waitForSnapshotStarted(iterator); assertEqualsInAnyOrder( - Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length)); + Arrays.asList(snapshotExpected), + fetchAndConvert(iterator, snapshotExpected.length, Row::toString)); // test binlog phase try (Connection connection = getJdbcConnection(); @@ -196,7 +199,8 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase { DATABASE, testName)); } assertEqualsInAnyOrder( - Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length)); + Arrays.asList(binlogExpected), + fetchAndConvert(iterator, binlogExpected.length, Row::toString)); result.getJobClient().get().cancel().get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java index db87a98ac..32315d693 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.List; import static java.lang.String.format; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; /** * Database Polardbx supported the mysql protocol, but there are some different features in ddl. So @@ -108,7 +109,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); } - List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + List realSnapshotData = + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString); assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData); // second step: check the sink data @@ -155,7 +157,7 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { for (int i = 0; i < captureCustomerTables.length; i++) { expectedBinlogData.addAll(Arrays.asList(expectedBinlog)); } - List realBinlog = fetchRows(iterator, expectedBinlog.length); + List realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString); assertEqualsInOrder(expectedBinlogData, realBinlog); tableResult.getJobClient().get().cancel().get(); } @@ -245,7 +247,7 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { TableResult tableResult = tEnv.executeSql("select * from polardbx_full_types"); CloseableIterator iterator = tableResult.collect(); - List realSnapshotData = fetchRows(iterator, 1); + List realSnapshotData = fetchAndConvert(iterator, 1, Row::toString); String[] expectedSnapshotData = new String[] { "+I[100001, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, " @@ -326,7 +328,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); } - List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + List realSnapshotData = + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString); assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData); // second step: check the sink data @@ -375,7 +378,7 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase { "+I[7, 9999, 9999, 1007, 2022-01-17T00:00]", "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]" }; - List realBinlog = fetchRows(iterator, expectedBinlog.length); + List realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString); assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog); tableResult.getJobClient().get().cancel().get(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java index 43c3c6783..e1df020d0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java @@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.polardbx; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.types.Row; import com.github.dockerjava.api.model.ExposedPort; import com.github.dockerjava.api.model.PortBinding; @@ -42,9 +41,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.function.Function; @@ -160,16 +157,6 @@ public abstract class PolardbxSourceTestBase extends AbstractTestBase { } } - protected static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - protected String getTableNameRegex(String[] captureCustomerTables) { checkState(captureCustomerTables.length > 0); if (captureCustomerTables.length == 1) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java index 1b0cba5f2..af9c276fc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.oracle.source; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.utils.TestCaseUtils; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.options.StartupOptions; import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook; @@ -52,12 +53,10 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; import static java.lang.String.format; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; @@ -490,7 +489,7 @@ public class OracleSourceITCase extends OracleSourceTestBase { try (CloseableIterator iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - records = fetchRowData(iterator, fetchSize, customerTable::stringify); + records = fetchAndConvert(iterator, fetchSize, customerTable::stringify); env.close(); } return records; @@ -617,7 +616,9 @@ public class OracleSourceITCase extends OracleSourceTestBase { LOG.info("snapshot data start"); assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + TestCaseUtils.fetchAndConvert( + iterator, expectedSnapshotData.size(), Row::toString)); // second step: check the redo log data for (String tableId : captureCustomerTables) { @@ -650,7 +651,8 @@ public class OracleSourceITCase extends OracleSourceTestBase { expectedRedoLogData.addAll(Arrays.asList(redoLogForSingleTable)); } assertEqualsInAnyOrder( - expectedRedoLogData, fetchRows(iterator, expectedRedoLogData.size())); + expectedRedoLogData, + TestCaseUtils.fetchAndConvert(iterator, expectedRedoLogData.size(), Row::toString)); tableResult.getJobClient().get().cancel().get(); } @@ -675,27 +677,6 @@ public class OracleSourceITCase extends OracleSourceTestBase { } } - private static List fetchRowData( - Iterator iter, int size, Function stringifier) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return rows.stream().map(stringifier).collect(Collectors.toList()); - } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private String getTableNameRegex(String[] captureCustomerTables) { checkState(captureCustomerTables.length > 0); if (captureCustomerTables.length == 1) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java index c63dec1e7..7a3076485 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.types.Row; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.connection.PostgresConnection; @@ -43,9 +42,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -224,16 +221,6 @@ public abstract class PostgresTestBase extends AbstractTestBase { return postgresSourceConfigFactory; } - public static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - public static void assertEqualsInAnyOrder(List expected, List actual) { assertTrue(expected != null && actual != null); assertEqualsInOrder( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java index 7d2c6dde8..3b77e684e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.postgres.source; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.utils.TestCaseUtils; import org.apache.flink.cdc.connectors.base.options.StartupOptions; import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook; import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks; @@ -59,15 +60,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils.hasNextData; import static org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils.triggerFailover; import static org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils.waitUntilJobRunning; @@ -773,7 +773,7 @@ public class PostgresSourceITCase extends PostgresTestBase { try (CloseableIterator iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - records = fetchRowData(iterator, fetchSize, customerTable::stringify); + records = fetchAndConvert(iterator, fetchSize, customerTable::stringify); env.close(); } return records; @@ -943,7 +943,9 @@ public class PostgresSourceITCase extends PostgresTestBase { } assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + TestCaseUtils.fetchAndConvert( + iterator, expectedSnapshotData.size(), Row::toString)); } private void checkStreamData( @@ -984,7 +986,9 @@ public class PostgresSourceITCase extends PostgresTestBase { // wait for the stream reading Thread.sleep(2000L); - assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size())); + assertEqualsInAnyOrder( + expectedStreamData, + TestCaseUtils.fetchAndConvert(iterator, expectedStreamData.size(), Row::toString)); assertTrue(!hasNextData(iterator)); } @@ -1053,7 +1057,9 @@ public class PostgresSourceITCase extends PostgresTestBase { // wait for the stream reading Thread.sleep(2000L); - assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size())); + assertEqualsInAnyOrder( + expectedStreamData, + TestCaseUtils.fetchAndConvert(iterator, expectedStreamData.size(), Row::toString)); assertTrue(!hasNextData(iterator)); } @@ -1109,7 +1115,9 @@ public class PostgresSourceITCase extends PostgresTestBase { // wait for the stream reading Thread.sleep(2000L); - assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size())); + assertEqualsInAnyOrder( + expectedStreamData, + TestCaseUtils.fetchAndConvert(iterator, expectedStreamData.size(), Row::toString)); assertTrue(!hasNextData(iterator)); } @@ -1130,27 +1138,6 @@ public class PostgresSourceITCase extends PostgresTestBase { } } - private static List fetchRowData( - Iterator iter, int size, Function stringifier) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return rows.stream().map(stringifier).collect(Collectors.toList()); - } - - public static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - /** * Make some changes on the specified customer table. Changelog in string could be accessed by * {@link #firstPartStreamEvents}. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 4b355cac0..96f0b0db8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -784,7 +785,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { expected.addAll(Arrays.asList("-U[1, a]", "+U[1, null]")); CloseableIterator iterator = tableResult.collect(); - assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size())); + assertEqualsInAnyOrder(expected, fetchAndConvert(iterator, expected.size(), Row::toString)); tableResult.getJobClient().get().cancel().get(); RowUtils.USE_LEGACY_TO_STRING = true; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java index e8b231aac..132e38c57 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java @@ -47,12 +47,10 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; import static java.lang.String.format; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.catalog.Column.physical; @@ -368,7 +366,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { try (CloseableIterator iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - records = fetchRowData(iterator, fetchSize, customerTable::stringify); + records = fetchAndConvert(iterator, fetchSize, customerTable::stringify); env.close(); } return records; @@ -493,7 +491,8 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { LOG.info("snapshot data start"); assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + expectedSnapshotData, + fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString)); // second step: check the change stream data for (String tableId : captureCustomerTables) { @@ -525,7 +524,9 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { for (int i = 0; i < captureCustomerTables.length; i++) { expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); } - assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size())); + assertEqualsInAnyOrder( + expectedBinlogData, + fetchAndConvert(iterator, expectedBinlogData.size(), Row::toString)); tableResult.getJobClient().get().cancel().get(); } @@ -550,27 +551,6 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase { } } - public static List fetchRowData( - Iterator iter, int size, Function stringifier) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); - rows.add(row); - size--; - } - return rows.stream().map(stringifier).collect(Collectors.toList()); - } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - private String getTableNameRegex(String[] captureCustomerTables) { checkState(captureCustomerTables.length > 0); if (captureCustomerTables.length == 1) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java index c660fa67e..d720deaa5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc/src/test/java/org/apache/flink/cdc/connectors/vitess/table/VitessConnectorITCase.java @@ -32,13 +32,12 @@ import org.junit.Test; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -207,21 +206,11 @@ public class VitessConnectorITCase extends VitessTestBase { "-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]", "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Bye World, abc, 123.102, 404.4443, 123.4567, 346, true]"); - List actual = fetchRows(result.collect(), expected.size()); + List actual = fetchAndConvert(result.collect(), expected.size(), Row::toString); assertEquals(expected, actual); result.getJobClient().get().cancel().get(); } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - public static void assertEqualsInAnyOrder(List actual, List expected) { assertTrue(actual != null && expected != null); assertEquals(