tests: fetch table results with timeout set properly

yuxiqian 2 weeks ago
parent 414e3be054
commit 856c664014

@ -19,11 +19,19 @@ package org.apache.flink.cdc.common.utils;
import org.apache.flink.util.function.SupplierWithException;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/** Some utility methods for creating repeated-checking test cases. */
public class TestCaseUtils {
@ -91,4 +99,78 @@ public class TestCaseUtils {
}
throw new RuntimeException("Timeout when waiting for state to be ready.");
}
/**
* Fetches at most {@code size} entries from {@link Iterator} {@code iter} within {@code
* DEFAULT_TIMEOUT}.
*/
public static <T> List<T> fetch(Iterator<T> iter, final int size) throws InterruptedException {
return fetch(iter, size, DEFAULT_TIMEOUT);
}
/**
* Fetches at most {@code size} entries from {@link Iterator} {@code iter}. <br>
* It may return a list with less than {@code size} elements, if {@code iter} doesn't provide
* results or {@code timeout} exceeds.
*/
public static <T> List<T> fetch(Iterator<T> iter, final int size, @Nullable Duration timeout)
throws InterruptedException {
long deadline = Long.MAX_VALUE;
if (timeout != null) {
deadline = System.currentTimeMillis() + timeout.toMillis();
}
ConcurrentLinkedQueue<T> results = new ConcurrentLinkedQueue<>();
AtomicReference<Throwable> fetchException = new AtomicReference<>();
Thread thread =
new Thread(
() -> {
try {
int remainingSize = size;
while (remainingSize > 0 && iter.hasNext()) {
T row = iter.next();
results.add(row);
remainingSize--;
}
} catch (Throwable t) {
fetchException.set(t);
}
});
thread.start();
while (true) {
// Raise any exception thrown by the fetching thread
if (fetchException.get() != null) {
throw (RuntimeException) fetchException.get();
}
// Stop if fetching thread has exited
if (!thread.isAlive()) {
break;
}
// Stop waiting if deadline has arrived
if (System.currentTimeMillis() > deadline) {
thread.interrupt();
break;
}
Thread.sleep(1000L);
}
return new ArrayList<>(results);
}
public static <S, T> List<T> fetchAndConvert(
Iterator<S> iter, int size, Function<S, T> converter) throws InterruptedException {
return fetch(iter, size).stream().map(converter).collect(Collectors.toList());
}
public static <S, T> List<T> fetchAndConvert(
Iterator<S> iter, int size, Duration timeout, Function<S, T> converter)
throws InterruptedException {
return fetch(iter, size, timeout).stream().map(converter).collect(Collectors.toList());
}
}

@ -38,10 +38,10 @@ import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static java.lang.String.format;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.testcontainers.containers.Db2Container.DB2_PORT;
/** IT tests for {@link Db2IncrementalSource}. */
@ -217,7 +217,8 @@ public class Db2SourceITCase extends Db2TestBase {
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString));
// second step: check the change stream data
for (String tableId : captureCustomerTables) {
@ -250,7 +251,8 @@ public class Db2SourceITCase extends Db2TestBase {
expectedRedoLogsData.addAll(Arrays.asList(redoLogsForSingleTable));
}
assertEqualsInAnyOrder(
expectedRedoLogsData, fetchRows(iterator, expectedRedoLogsData.size()));
expectedRedoLogsData,
fetchAndConvert(iterator, expectedRedoLogsData.size(), Row::toString));
tableResult.getJobClient().get().cancel().get();
}
@ -275,16 +277,6 @@ public class Db2SourceITCase extends Db2TestBase {
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
/** The type of failover. */
protected enum FailoverType {
TM,

@ -56,11 +56,10 @@ import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRowData;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRows;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.triggerFailover;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
@ -584,7 +583,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
@ -713,7 +712,8 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString));
// second step: check the change stream data
for (String collectionName : captureCustomerCollections) {
@ -747,7 +747,8 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedChangeStreamData.addAll(Arrays.asList(changeEventsForSingleTable));
}
List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
List<String> actualChangeStreamData =
fetchAndConvert(iterator, expectedChangeStreamData.size(), Row::toString);
assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
tableResult.getJobClient().get().cancel().get();
}

@ -56,11 +56,10 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRowData;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRows;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils.triggerFailover;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
@ -496,7 +495,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
@ -605,7 +604,8 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString));
// second step: check the change stream data
for (String collectionName : captureCustomerCollections) {
@ -639,7 +639,8 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedChangeStreamData.addAll(Arrays.asList(changeEventsForSingleTable));
}
List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
List<String> actualChangeStreamData =
fetchAndConvert(iterator, expectedChangeStreamData.size(), Row::toString);
assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
tableResult.getJobClient().get().cancel().get();
}

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mongodb.table;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
@ -33,7 +34,6 @@ import org.junit.runners.Parameterized;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@ -154,7 +154,8 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase {
break;
}
List<String> actualSnapshot = fetchRows(iterator, expectedSnapshot.length);
List<String> actualSnapshot =
TestCaseUtils.fetchAndConvert(iterator, expectedSnapshot.length, Row::toString);
assertThat(actualSnapshot, containsInAnyOrder(expectedSnapshot));
result.getJobClient().get().cancel().get();
@ -217,19 +218,10 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase {
break;
}
List<String> actualSnapshot = fetchRows(iterator, expectedSnapshot.length);
List<String> actualSnapshot =
TestCaseUtils.fetchAndConvert(iterator, expectedSnapshot.length, Row::toString);
assertThat(actualSnapshot, containsInAnyOrder(expectedSnapshot));
result.getJobClient().get().cancel().get();
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
}

@ -20,16 +20,9 @@ package org.apache.flink.cdc.connectors.mongodb.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.Assert.fail;
@ -74,27 +67,6 @@ public class MongoDBTestUtils {
}
}
public static List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
public static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
/** The type of failover. */
public enum FailoverType {
TM,

@ -39,13 +39,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetch;
import static org.junit.Assert.assertTrue;
/** Integration tests for the legacy {@link MySqlSource}. */
@ -112,7 +110,7 @@ public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {
waitForSnapshotStarted(snapshot);
assertTrue(
dataInJsonIsEquals(
fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString()));
fetch(snapshot, 1).get(0).toString(), expectSnapshot.toString()));
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
@ -122,9 +120,7 @@ public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {
// check the binlog result
CloseableIterator<Row> binlog = result.collect();
JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
assertTrue(
dataInJsonIsEquals(
fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString()));
assertTrue(dataInJsonIsEquals(fetch(binlog, 1).get(0).toString(), expectBinlog.toString()));
result.getJobClient().get().cancel().get();
}
@ -136,17 +132,6 @@ public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {
testConsumingAllEventsWithJsonFormat(includeSchema, null, expectedFile);
}
private static List<Object> fetchRows(Iterator<Row> iter, int size) {
List<Object> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
// ignore rowKind marker
rows.add(row.getField(0));
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);

@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.mysql.debezium.converters;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
@ -60,7 +61,6 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
@ -227,7 +227,7 @@ public class MysqlDebeziumTimeConverterITCase {
return debeziumProperties;
}
private void checkData(TableResult tableResult) {
private void checkData(TableResult tableResult) throws Exception {
String[] snapshotForSingleTable =
new String[] {
"+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]",
@ -240,17 +240,8 @@ public class MysqlDebeziumTimeConverterITCase {
CloseableIterator<Row> collect = tableResult.collect();
tableResult.getJobClient().get().getJobID();
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(collect, expectedSnapshotData.size()));
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
expectedSnapshotData,
TestCaseUtils.fetchAndConvert(collect, expectedSnapshotData.size(), Row::toString));
}
protected MySqlContainer createMySqlContainer(String timezone) {

@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.junit.After;
import org.junit.AfterClass;
@ -54,9 +53,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
@ -571,14 +568,4 @@ public class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase
}
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
}

@ -84,7 +84,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -97,12 +96,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -347,7 +347,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
// Check all snapshot records are sent with exactly-once semantics
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshotData),
fetchRowData(iterator, expectedSnapshotData.length));
fetchAndConvert(iterator, expectedSnapshotData.length, RowData::toString));
assertTrue(!hasNextData(iterator));
jobClient.cancel().get();
}
@ -638,7 +638,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
List<String> records = fetchRowData(iterator, fetchSize, customerTable::stringify);
List<String> records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
return records;
}
}
@ -693,7 +693,9 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
DataStreamSource<RowData> source =
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
try (CloseableIterator<RowData> iterator = source.executeAndCollect()) {
List<String> rows = fetchRowData(iterator, expectedChangelogAfterStart.size());
List<String> rows =
fetchAndConvert(
iterator, expectedChangelogAfterStart.size(), RowData::toString);
assertEqualsInAnyOrder(expectedChangelogAfterStart, rows);
}
}
@ -1027,7 +1029,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString));
}
private void checkBinlogData(
@ -1064,8 +1067,10 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
expectedBinlogData.addAll(secondPartBinlogEvents);
}
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
assertTrue(!hasNextData(iterator));
assertEqualsInAnyOrder(
expectedBinlogData,
fetchAndConvert(iterator, expectedBinlogData.size(), Row::toString));
assertThat(iterator.hasNext()).isFalse();
}
private static List<String> convertRowDataToRowString(List<RowData> rows) {
@ -1090,37 +1095,6 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
.collect(Collectors.toList());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
private static List<String> fetchRowData(Iterator<RowData> iter, int size) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return convertRowDataToRowString(rows);
}
private String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {

@ -62,7 +62,6 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -79,6 +78,7 @@ import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.util.Preconditions.checkState;
/** IT tests to cover various newly added tables during capture process. */
@ -487,7 +487,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
expectedCustomersResult.stream())
.collect(Collectors.toList())
: expectedCustomersResult;
List<String> rows = fetchRowData(iterator, expectedSnapshotResult.size());
List<String> rows =
fetchAndConvert(iterator, expectedSnapshotResult.size(), RowData::toString);
assertEqualsInAnyOrder(expectedSnapshotResult, rows);
// make binlog events
@ -503,7 +504,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
"UPDATE " + tableId + " SET address = 'Update2' where id = 103");
connection.commit();
}
rows = fetchRowData(iterator, expectedBinlogResult.size());
rows = fetchAndConvert(iterator, expectedBinlogResult.size(), RowData::toString);
assertEqualsInAnyOrder(expectedBinlogResult, rows);
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -540,16 +541,6 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
return iterator;
}
private List<String> fetchRowData(Iterator<RowData> iter, int size) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return convertRowDataToRowString(rows);
}
private static List<String> convertRowDataToRowString(List<RowData> rows) {
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
map.put("id", 0);

@ -62,18 +62,15 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.assertj.core.api.Assertions.assertThat;
/** Integration test for validating specifying starting offset. */
@ -150,7 +147,7 @@ public class SpecificStartingOffsetITCase {
// Execute job and validate results
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);
List<String> rows = fetchRowData(iterator, 3, customers::stringify);
List<String> rows = fetchAndConvert(iterator, 3, customers::stringify);
assertThat(rows)
.containsExactly(
"+I[15213, Alice, Rome, 123456987]",
@ -176,7 +173,7 @@ public class SpecificStartingOffsetITCase {
setupSavepoint(restoredEnv, savepointPath);
JobClient restoredJobClient = restoredEnv.executeAsync();
iterator.setJobClient(restoredJobClient);
List<String> rowsAfterRestored = fetchRowData(iterator, 2, customers::stringify);
List<String> rowsAfterRestored = fetchAndConvert(iterator, 2, customers::stringify);
assertThat(rowsAfterRestored)
.containsExactly(
"-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]");
@ -225,7 +222,7 @@ public class SpecificStartingOffsetITCase {
// Execute job and validate results
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);
List<String> rows = fetchRowData(iterator, 3, customers::stringify);
List<String> rows = fetchAndConvert(iterator, 3, customers::stringify);
assertThat(rows)
.containsExactly(
"+I[15213, Alice, Rome, 123456987]",
@ -251,7 +248,7 @@ public class SpecificStartingOffsetITCase {
setupSavepoint(restoredEnv, savepointPath);
JobClient restoredJobClient = restoredEnv.executeAsync("snapshotSplitTest");
iterator.setJobClient(restoredJobClient);
List<String> rowsAfterRestored = fetchRowData(iterator, 2, customers::stringify);
List<String> rowsAfterRestored = fetchAndConvert(iterator, 2, customers::stringify);
assertThat(rowsAfterRestored)
.containsExactly(
"-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]");
@ -398,7 +395,7 @@ public class SpecificStartingOffsetITCase {
// Execute job and validate results
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);
List<String> rows = fetchRowData(iterator, 3, customers::stringify);
List<String> rows = fetchAndConvert(iterator, 3, customers::stringify);
assertThat(rows)
.containsExactly(
"+I[19613, Tom, NewYork, 123456987]",
@ -424,7 +421,7 @@ public class SpecificStartingOffsetITCase {
setupSavepoint(restoredEnv, savepointPath);
JobClient restoredJobClient = restoredEnv.executeAsync("snapshotSplitTest");
iterator.setJobClient(restoredJobClient);
List<String> rowsAfterRestored = fetchRowData(iterator, 2, customers::stringify);
List<String> rowsAfterRestored = fetchAndConvert(iterator, 2, customers::stringify);
assertThat(rowsAfterRestored)
.containsExactly(
"-U[18213, Charlie, Paris, 123456987]",
@ -500,17 +497,6 @@ public class SpecificStartingOffsetITCase {
return iterator;
}
private List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) {
try {
TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory);

@ -44,16 +44,14 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
@ -185,7 +183,8 @@ public class MySqlCompatibilityITCase {
"+I[109, spare tire, 24 inch spare tire, 22.200]"
};
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length));
Arrays.asList(expectedSnapshot),
fetchAndConvert(iterator, expectedSnapshot.length, Row::toString));
try (Connection connection = testDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -218,7 +217,8 @@ public class MySqlCompatibilityITCase {
};
assertEqualsInOrder(
Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length));
Arrays.asList(expectedBinlog),
fetchAndConvert(iterator, expectedBinlog.length, Row::toString));
result.getJobClient().get().cancel().get();
mySqlContainer.stop();
}
@ -229,16 +229,6 @@ public class MySqlCompatibilityITCase {
return serverId + "-" + (serverId + env.getParallelism());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String buildCustomMySqlConfig(MySqlVersion version, boolean enableGtid) {
try {
File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));

@ -57,7 +57,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
@ -65,6 +64,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMySqlLatestOffset;
import static org.apache.flink.cdc.connectors.mysql.MySqlTestUtils.assertContainsErrorMsg;
import static org.apache.flink.cdc.connectors.mysql.MySqlTestUtils.waitForJobStatus;
@ -473,7 +473,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[109, spare tire, 24 inch spare tire, 22.200]"
};
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length));
Arrays.asList(expectedSnapshot),
fetchAndConvert(iterator, expectedSnapshot.length, Row::toString));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -499,7 +500,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertEqualsInOrder(
Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length));
Arrays.asList(expectedBinlog),
fetchAndConvert(iterator, expectedBinlog.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -760,7 +762,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
+ "]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -830,7 +833,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[0, 1024, " + getIntegerSeqString(2, tableColumnCount) + "]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -1096,7 +1100,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -1174,7 +1179,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 50, 500, flink]",
"-D[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -1241,7 +1247,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[416927583791428523, China, Hangzhou, West Town address 2]",
"+I[418257940021724075, Germany, Berlin, West Town address 3]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -1307,7 +1314,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[103, user_3, Hangzhou, 123567891234]",
"+I[110, newCustomer, Berlin, 12345678]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
customer3_0Database.dropDatabase();
}
@ -1381,7 +1389,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -1519,7 +1528,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
+ " tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 '"
+ " );");
}
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
jobClient.cancel().get();
}
@ -1588,7 +1598,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
statement.execute(
"alter table default_value_test add column `int_test` INT DEFAULT ' 30 ';");
}
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
jobClient.cancel().get();
}
@ -1986,7 +1997,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[123458.6789, KIND_003, user_3, my shopping cart]",
"+I[123459.1234, KIND_004, user_4, null]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -2051,7 +2063,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[E, 3, flink]",
"+I[e, 4, flink]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -2183,7 +2196,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[1, 127, 255, 255, 32767, 65535, 65535, 2023]",
"+I[2, 127, 255, 255, 32767, 65535, 65535, 2024]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -2255,16 +2269,6 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
@ -2348,7 +2352,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]",
"-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
}

@ -41,13 +41,13 @@ import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
/** Integration tests for MySQL shardding tables. */
@RunWith(Parameterized.class)
public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase {
@ -269,7 +269,8 @@ public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase {
"+U[221, user_221, Shanghai, 123567891234, null, 20]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -341,16 +342,6 @@ public class MySqlConnectorShardingTableITCase extends MySqlSourceTestBase {
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);

@ -39,14 +39,12 @@ import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
/** Integration tests for MySQL Table source. */
@RunWith(Parameterized.class)
@ -100,7 +98,7 @@ public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase {
}
@Test
public void testJsonArrayAsKeyIndex() {
public void testJsonArrayAsKeyIndex() throws InterruptedException {
UniqueDatabase jaakiDatabase =
new UniqueDatabase(container, "json_array_as_key", TEST_USER, TEST_PASSWORD);
jaakiDatabase.createAndInitialize();
@ -161,7 +159,8 @@ public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase {
"+I[17]", "+I[18]", "+I[19]", "-D[19]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchAndConvert(iterator, expected.length, Row::toString));
try {
result.getJobClient().get().cancel().get();
@ -170,16 +169,6 @@ public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase {
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;

@ -52,9 +52,7 @@ import org.testcontainers.lifecycle.Startables;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
@ -64,6 +62,7 @@ import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
/**
* IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
@ -227,7 +226,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
// Wait for a little while until we're in Binlog streaming mode.
@ -261,7 +262,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
new String[] {
"+I[10000, Alice, Beijing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
{
@ -291,7 +294,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
new String[] {
"+I[10001, Bob, Chongqing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
{
@ -321,7 +326,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
new String[] {
"+I[10002, Cicada, Urumqi, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
}
@ -395,7 +402,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
// Wait for a little while until we're in Binlog streaming mode.
@ -429,7 +438,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
new String[] {
"+I[10000, Alice, Beijing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
@ -458,7 +469,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
new String[] {
"+I[10001, Bob, Chongqing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
@ -487,7 +500,9 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
new String[] {
"+I[10002, Cicada, Urumqi, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
assertEqualsInAnyOrder(
Arrays.asList(expected),
fetchAndConvert(iterator, expected.length, Row::toString));
}
}
@ -585,14 +600,4 @@ public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
}
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
}

@ -45,16 +45,15 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
@ -189,7 +188,8 @@ public class MySqlTimezoneITCase {
"+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]"
};
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length));
Arrays.asList(expectedSnapshot),
fetchAndConvert(iterator, expectedSnapshot.length, Row::toString));
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -205,7 +205,8 @@ public class MySqlTimezoneITCase {
};
assertEqualsInOrder(
Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length));
Arrays.asList(expectedBinlog),
fetchAndConvert(iterator, expectedBinlog.length, Row::toString));
result.getJobClient().get().cancel().get();
mySqlContainer.stop();
@ -228,16 +229,6 @@ public class MySqlTimezoneITCase {
return 0;
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String buildMySqlConfigWithTimezone(String timezone) {
try {
File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));

@ -35,12 +35,11 @@ import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
/** Test supporting different column charsets for MySQL Table source. */
@RunWith(Parameterized.class)
public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase {
@ -379,7 +378,8 @@ public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase {
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(
Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length));
Arrays.asList(snapshotExpected),
fetchAndConvert(iterator, snapshotExpected.length, Row::toString));
// test binlog phase
try (Connection connection = charsetTestDatabase.getJdbcConnection();
@ -387,7 +387,8 @@ public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase {
statement.execute(String.format("UPDATE %s SET table_id = table_id + 10;", testName));
}
assertEqualsInAnyOrder(
Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length));
Arrays.asList(binlogExpected),
fetchAndConvert(iterator, binlogExpected.length, Row::toString));
result.getJobClient().get().cancel().get();
}
@ -397,16 +398,6 @@ public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase {
return serverId + "-" + (serverId + env.getParallelism());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);

@ -36,6 +36,8 @@ import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
/** Test supporting different column charsets for Polardbx. */
@RunWith(Parameterized.class)
public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
@ -185,7 +187,8 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(
Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length));
Arrays.asList(snapshotExpected),
fetchAndConvert(iterator, snapshotExpected.length, Row::toString));
// test binlog phase
try (Connection connection = getJdbcConnection();
@ -196,7 +199,8 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
DATABASE, testName));
}
assertEqualsInAnyOrder(
Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length));
Arrays.asList(binlogExpected),
fetchAndConvert(iterator, binlogExpected.length, Row::toString));
result.getJobClient().get().cancel().get();
}

@ -36,6 +36,7 @@ import java.util.Arrays;
import java.util.List;
import static java.lang.String.format;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
/**
* Database Polardbx supported the mysql protocol, but there are some different features in ddl. So
@ -108,7 +109,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
List<String> realSnapshotData = fetchRows(iterator, expectedSnapshotData.size());
List<String> realSnapshotData =
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString);
assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
// second step: check the sink data
@ -155,7 +157,7 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(expectedBinlog));
}
List<String> realBinlog = fetchRows(iterator, expectedBinlog.length);
List<String> realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString);
assertEqualsInOrder(expectedBinlogData, realBinlog);
tableResult.getJobClient().get().cancel().get();
}
@ -245,7 +247,7 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
TableResult tableResult = tEnv.executeSql("select * from polardbx_full_types");
CloseableIterator<Row> iterator = tableResult.collect();
List<String> realSnapshotData = fetchRows(iterator, 1);
List<String> realSnapshotData = fetchAndConvert(iterator, 1, Row::toString);
String[] expectedSnapshotData =
new String[] {
"+I[100001, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, "
@ -326,7 +328,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
List<String> realSnapshotData = fetchRows(iterator, expectedSnapshotData.size());
List<String> realSnapshotData =
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString);
assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
// second step: check the sink data
@ -375,7 +378,7 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
"+I[7, 9999, 9999, 1007, 2022-01-17T00:00]",
"-D[7, 9999, 9999, 1007, 2022-01-17T00:00]"
};
List<String> realBinlog = fetchRows(iterator, expectedBinlog.length);
List<String> realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString);
assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog);
tableResult.getJobClient().get().cancel().get();
}

@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.polardbx;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
@ -42,9 +41,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
@ -160,16 +157,6 @@ public abstract class PolardbxSourceTestBase extends AbstractTestBase {
}
}
protected static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
protected String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.oracle.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
@ -52,12 +53,10 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
@ -490,7 +489,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
@ -617,7 +616,9 @@ public class OracleSourceITCase extends OracleSourceTestBase {
LOG.info("snapshot data start");
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
TestCaseUtils.fetchAndConvert(
iterator, expectedSnapshotData.size(), Row::toString));
// second step: check the redo log data
for (String tableId : captureCustomerTables) {
@ -650,7 +651,8 @@ public class OracleSourceITCase extends OracleSourceTestBase {
expectedRedoLogData.addAll(Arrays.asList(redoLogForSingleTable));
}
assertEqualsInAnyOrder(
expectedRedoLogData, fetchRows(iterator, expectedRedoLogData.size()));
expectedRedoLogData,
TestCaseUtils.fetchAndConvert(iterator, expectedRedoLogData.size(), Row::toString));
tableResult.getJobClient().get().cancel().get();
}
@ -675,27 +677,6 @@ public class OracleSourceITCase extends OracleSourceTestBase {
}
}
private static List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {

@ -22,7 +22,6 @@ import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
@ -43,9 +42,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -224,16 +221,6 @@ public abstract class PostgresTestBase extends AbstractTestBase {
return postgresSourceConfigFactory;
}
public static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.postgres.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
@ -59,15 +60,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils.hasNextData;
import static org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils.triggerFailover;
import static org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils.waitUntilJobRunning;
@ -773,7 +773,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
@ -943,7 +943,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
TestCaseUtils.fetchAndConvert(
iterator, expectedSnapshotData.size(), Row::toString));
}
private void checkStreamData(
@ -984,7 +986,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
// wait for the stream reading
Thread.sleep(2000L);
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
assertEqualsInAnyOrder(
expectedStreamData,
TestCaseUtils.fetchAndConvert(iterator, expectedStreamData.size(), Row::toString));
assertTrue(!hasNextData(iterator));
}
@ -1053,7 +1057,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
// wait for the stream reading
Thread.sleep(2000L);
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
assertEqualsInAnyOrder(
expectedStreamData,
TestCaseUtils.fetchAndConvert(iterator, expectedStreamData.size(), Row::toString));
assertTrue(!hasNextData(iterator));
}
@ -1109,7 +1115,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
// wait for the stream reading
Thread.sleep(2000L);
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
assertEqualsInAnyOrder(
expectedStreamData,
TestCaseUtils.fetchAndConvert(iterator, expectedStreamData.size(), Row::toString));
assertTrue(!hasNextData(iterator));
}
@ -1130,27 +1138,6 @@ public class PostgresSourceITCase extends PostgresTestBase {
}
}
private static List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
public static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
/**
* Make some changes on the specified customer table. Changelog in string could be accessed by
* {@link #firstPartStreamEvents}.

@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@ -784,7 +785,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
expected.addAll(Arrays.asList("-U[1, a]", "+U[1, null]"));
CloseableIterator<Row> iterator = tableResult.collect();
assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size()));
assertEqualsInAnyOrder(expected, fetchAndConvert(iterator, expected.size(), Row::toString));
tableResult.getJobClient().get().cancel().get();
RowUtils.USE_LEGACY_TO_STRING = true;
}

@ -47,12 +47,10 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
@ -368,7 +366,7 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
try (CloseableIterator<RowData> iterator =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source")
.executeAndCollect()) {
records = fetchRowData(iterator, fetchSize, customerTable::stringify);
records = fetchAndConvert(iterator, fetchSize, customerTable::stringify);
env.close();
}
return records;
@ -493,7 +491,8 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
LOG.info("snapshot data start");
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
expectedSnapshotData,
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString));
// second step: check the change stream data
for (String tableId : captureCustomerTables) {
@ -525,7 +524,9 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
}
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
assertEqualsInAnyOrder(
expectedBinlogData,
fetchAndConvert(iterator, expectedBinlogData.size(), Row::toString));
tableResult.getJobClient().get().cancel().get();
}
@ -550,27 +551,6 @@ public class SqlServerSourceITCase extends SqlServerSourceTestBase {
}
}
public static List<String> fetchRowData(
Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return rows.stream().map(stringifier).collect(Collectors.toList());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {

@ -32,13 +32,12 @@ import org.junit.Test;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.fetchAndConvert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -207,21 +206,11 @@ public class VitessConnectorITCase extends VitessTestBase {
"-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]",
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Bye World, abc, 123.102, 404.4443, 123.4567, 346, true]");
List<String> actual = fetchRows(result.collect(), expected.size());
List<String> actual = fetchAndConvert(result.collect(), expected.size(), Row::toString);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
public static void assertEqualsInAnyOrder(List<String> actual, List<String> expected) {
assertTrue(actual != null && expected != null);
assertEquals(

Loading…
Cancel
Save