fix: mysql ci failure

pull/3884/head
yuxiqian 1 week ago
parent 9e98c49c29
commit 3d77dfc907

@ -96,7 +96,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
@ -347,7 +346,10 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
// Check all snapshot records are sent with exactly-once semantics
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshotData),
fetchAndConvert(iterator, expectedSnapshotData.length, RowData::toString));
fetchAndConvert(
iterator,
expectedSnapshotData.length,
MySqlSourceITCase::convertRowDataToRowString));
assertTrue(!hasNextData(iterator));
jobClient.cancel().get();
}
@ -1073,26 +1075,19 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
assertThat(iterator.hasNext()).isFalse();
}
private static List<String> convertRowDataToRowString(List<RowData> rows) {
private static String convertRowDataToRowString(RowData row) {
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
map.put("id", 0);
map.put("name", 1);
map.put("address", 2);
map.put("phone_number", 3);
return rows.stream()
.map(
row ->
RowUtils.createRowWithNamedPositions(
row.getRowKind(),
new Object[] {
row.getLong(0),
row.getString(1),
row.getString(2),
row.getString(3)
},
map)
.toString())
.collect(Collectors.toList());
return RowUtils.createRowWithNamedPositions(
row.getRowKind(),
new Object[] {
row.getLong(0), row.getString(1), row.getString(2), row.getString(3)
},
map)
.toString();
}
private String getTableNameRegex(String[] captureCustomerTables) {

@ -488,7 +488,10 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
.collect(Collectors.toList())
: expectedCustomersResult;
List<String> rows =
fetchAndConvert(iterator, expectedSnapshotResult.size(), RowData::toString);
fetchAndConvert(
iterator,
expectedSnapshotResult.size(),
NewlyAddedTableITCase::convertRowDataToRowString);
assertEqualsInAnyOrder(expectedSnapshotResult, rows);
// make binlog events
@ -504,7 +507,11 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
"UPDATE " + tableId + " SET address = 'Update2' where id = 103");
connection.commit();
}
rows = fetchAndConvert(iterator, expectedBinlogResult.size(), RowData::toString);
rows =
fetchAndConvert(
iterator,
expectedBinlogResult.size(),
NewlyAddedTableITCase::convertRowDataToRowString);
assertEqualsInAnyOrder(expectedBinlogResult, rows);
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@ -541,28 +548,24 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
return iterator;
}
private static List<String> convertRowDataToRowString(List<RowData> rows) {
private static String convertRowDataToRowString(RowData row) {
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
map.put("id", 0);
map.put("name", 1);
map.put("address", 2);
map.put("phone_number", 3);
map.put("_table_name", 4);
return rows.stream()
.map(
row ->
RowUtils.createRowWithNamedPositions(
row.getRowKind(),
new Object[] {
row.getLong(0),
row.getString(1),
row.getString(2),
row.getString(3),
row.getString(4)
},
map)
.toString())
.collect(Collectors.toList());
return RowUtils.createRowWithNamedPositions(
row.getRowKind(),
new Object[] {
row.getLong(0),
row.getString(1),
row.getString(2),
row.getString(3),
row.getString(4)
},
map)
.toString();
}
private void testRemoveTablesOneByOne(

@ -169,8 +169,8 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
testName,
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
@ -188,7 +188,7 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(
Arrays.asList(snapshotExpected),
fetchAndConvert(iterator, snapshotExpected.length, Row::toString));
fetchAndConvert(iterator, snapshotExpected.length, WAITING_TIMEOUT, Row::toString));
// test binlog phase
try (Connection connection = getJdbcConnection();
@ -200,7 +200,7 @@ public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
}
assertEqualsInAnyOrder(
Arrays.asList(binlogExpected),
fetchAndConvert(iterator, binlogExpected.length, Row::toString));
fetchAndConvert(iterator, binlogExpected.length, WAITING_TIMEOUT, Row::toString));
result.getJobClient().get().cancel().get();
}

@ -84,8 +84,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
@ -110,7 +110,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
}
List<String> realSnapshotData =
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString);
fetchAndConvert(
iterator, expectedSnapshotData.size(), WAITING_TIMEOUT, Row::toString);
assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
// second step: check the sink data
@ -157,7 +158,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(expectedBinlog));
}
List<String> realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString);
List<String> realBinlog =
fetchAndConvert(iterator, expectedBinlog.length, WAITING_TIMEOUT, Row::toString);
assertEqualsInOrder(expectedBinlogData, realBinlog);
tableResult.getJobClient().get().cancel().get();
}
@ -236,8 +238,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
@ -247,7 +249,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
TableResult tableResult = tEnv.executeSql("select * from polardbx_full_types");
CloseableIterator<Row> iterator = tableResult.collect();
List<String> realSnapshotData = fetchAndConvert(iterator, 1, Row::toString);
List<String> realSnapshotData =
fetchAndConvert(iterator, 1, WAITING_TIMEOUT, Row::toString);
String[] expectedSnapshotData =
new String[] {
"+I[100001, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, "
@ -303,8 +306,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
@ -329,7 +332,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
}
List<String> realSnapshotData =
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString);
fetchAndConvert(
iterator, expectedSnapshotData.size(), WAITING_TIMEOUT, Row::toString);
assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
// second step: check the sink data
@ -378,7 +382,8 @@ public class PolardbxSourceITCase extends PolardbxSourceTestBase {
"+I[7, 9999, 9999, 1007, 2022-01-17T00:00]",
"-D[7, 9999, 9999, 1007, 2022-01-17T00:00]"
};
List<String> realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString);
List<String> realBinlog =
fetchAndConvert(iterator, expectedBinlog.length, WAITING_TIMEOUT, Row::toString);
assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog);
tableResult.getJobClient().get().cancel().get();
}

@ -17,12 +17,10 @@
package org.apache.flink.cdc.connectors.polardbx;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -61,36 +59,37 @@ import static org.junit.Assert.assertTrue;
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
protected static final Integer PORT = 8527;
protected static final String HOST_NAME = "127.0.0.1";
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";
private static final String IMAGE_VERSION = "2.1.0";
private static final DockerImageName POLARDBX_IMAGE =
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);
protected static final Integer INNER_PORT = 8527;
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";
protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(1);
protected static final GenericContainer POLARDBX_CONTAINER =
new GenericContainer<>(POLARDBX_IMAGE)
.withExposedPorts(PORT)
.withExposedPorts(INNER_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withStartupTimeout(Duration.ofMinutes(3))
.withCreateContainerCmdModifier(
c ->
c.withPortBindings(
new PortBinding(
Ports.Binding.bindPort(PORT),
new ExposedPort(PORT))));
.withStartupTimeout(Duration.ofMinutes(3));
protected static String getHost() {
return POLARDBX_CONTAINER.getHost();
}
protected static int getPort() {
return POLARDBX_CONTAINER.getMappedPort(INNER_PORT);
}
@BeforeClass
public static void startContainers() throws InterruptedException {
// no need to start container when the port 8527 is listening
if (!checkConnection()) {
LOG.info("Polardbx connection is not valid, so try to start containers...");
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");
// here should wait 10s that make sure the polardbx is ready
Thread.sleep(10 * 1000);
}
public static void startContainers() {
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");
TestCaseUtils.repeatedCheck(
PolardbxSourceTestBase::checkConnection, WAITING_TIMEOUT, Duration.ofSeconds(1));
}
@AfterClass
@ -101,7 +100,7 @@ public abstract class PolardbxSourceTestBase extends AbstractTestBase {
}
protected static String getJdbcUrl() {
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
}
protected static Connection getJdbcConnection() throws SQLException {

Loading…
Cancel
Save