[mysql] Rewrite the newly added table tests to cover more cases (#777)

This closes #777.
pull/842/head
Leonard Xu 3 years ago
parent c94791fd14
commit 8a9d821dc7

@ -20,21 +20,31 @@ package com.ververica.cdc.connectors.mysql.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.lang3.StringUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
@ -43,13 +53,18 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import static java.lang.String.format;
import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
/** IT tests for {@link MySqlSource}. */
public class MySqlSourceITCase extends MySqlSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
private final UniqueDatabase customDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -121,93 +136,42 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
@Test
public void testJobManagerFailoverInBinlogPhaseWithNewlyAddTable() throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
"CREATE TABLE address ("
+ " id BIGINT NOT NULL,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = 'address.*',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'server-id' = '%s',"
+ " 'capture-new-tables' = 'true'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getServerId());
// first step: check the snapshot data
String[] snapshotForSingleTable =
new String[] {
"+I[416874195632735147, China, Beijing, West Town address 1]",
"+I[416927583791428523, China, Beijing, West Town address 2]",
"+I[417022095255614379, China, Beijing, West Town address 3]",
"+I[417111867899200427, America, New York, East Town address 1]",
"+I[417271541558096811, America, New York, East Town address 2]",
"+I[417272886855938987, America, New York, East Town address 3]",
"+I[417420106184475563, Germany, Berlin, West Town address 1]",
"+I[418161258277847979, Germany, Berlin, West Town address 2]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from address");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = Arrays.asList(snapshotForSingleTable);
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
createNewTableAndInsertData(
getConnection(), customDatabase.getDatabaseName() + ".address1");
List<String> expectedBinlogData = expectedSnapshotData;
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
// after this fail, we hope to fetch address1 snapshot data
triggerFailover(
FailoverType.JM, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
testNewlyAddedTableOneByOne(
1, FailoverType.NONE, FailoverPhase.NEVER, "address_hangzhou", "address_beijing");
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
@Test
public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
"address_hangzhou",
"address_beijing",
"address_shanghai");
}
String[] binlogForSingleTable =
new String[] {
"+I[826874195632735147, China, Beijing, West Town address 1]",
"+I[826927583791428523, China, Beijing, West Town address 2]",
"+I[827022095255614379, China, Beijing, West Town address 3]",
"+I[827111867899200427, America, New York, East Town address 1]",
"+I[827271541558096811, America, New York, East Town address 2]",
"+I[827272886855938987, America, New York, East Town address 3]",
"+I[827420106184475563, Germany, Berlin, West Town address 1]",
"+I[828161258277847979, Germany, Berlin, West Town address 2]"
};
expectedBinlogData = Arrays.asList(binlogForSingleTable);
@Test
public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception {
testNewlyAddedTableOneByOne(
1, FailoverType.NONE, FailoverPhase.NEVER, "address_hangzhou", "address_beijing");
}
insertDataToGenerateSomeBinlog(
getConnection(), customDatabase.getDatabaseName() + ".address1");
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
@Test
public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
"address_hangzhou",
"address_beijing");
}
tableResult.getJobClient().get().cancel().get();
@Test
public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
testNewlyAddedTableOneByOne(
1, FailoverType.TM, FailoverPhase.BINLOG, "address_hangzhou", "address_beijing");
}
private void testMySqlParallelSource(
@ -231,7 +195,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
@ -255,7 +219,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getTableName(captureCustomerTables),
getTableNameRegex(captureCustomerTables),
getServerId());
// first step: check the snapshot data
String[] snapshotForSingleTable =
@ -291,7 +255,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
// trigger checkpoint after some snapshot splits read finished
// trigger failover after some snapshot splits read finished
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
@ -336,6 +300,188 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
tableResult.getJobClient().get().cancel().get();
}
private void testNewlyAddedTableOneByOne(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
String... captureAddressTables)
throws Exception {
// step 1: create mysql tables with initial data
TestValuesTableFactory.clearAllData();
customDatabase.createAndInitialize();
initialAddressTables(getConnection(), captureAddressTables);
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
// test newly added table one by one
String finishedSavePointPath = null;
List<String> fetchedDataList = new ArrayList<>();
for (int round = 0; round < captureAddressTables.length; round++) {
String[] captureTablesThisRound =
Arrays.asList(captureAddressTables)
.subList(0, round + 1)
.toArray(new String[0]);
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement = getCreateTableStatement(captureTablesThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " table_name STRING,"
+ " id BIGINT,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
// step 2: assert fetched snapshot data in this round
String newlyAddedTable = captureAddressTables[round];
String cityName = newlyAddedTable.split("_")[1];
List<String> expectedSnapshotDataThisRound =
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName));
// trigger failover after some snapshot data read finished
if (failoverPhase == FailoverPhase.SNAPSHOT
&& TestValuesTableFactory.getRawResults("sink").size()
> fetchedDataList.size()) {
triggerFailover(
failoverType,
jobClient.getJobID(),
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 3: make some binlog data for this round
makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable);
if (failoverPhase == FailoverPhase.BINLOG) {
triggerFailover(
failoverType,
jobClient.getJobID(),
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable);
// step 4: assert fetched binlog data in this round
List<String> expectedBinlogDataThisRound =
Arrays.asList(
format(
"+U[%s, 416874195632735147, CHINA, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614380, China, %s, %s West Town address 4]",
newlyAddedTable, cityName, cityName));
// step 5: assert fetched binlog data in this round
fetchedDataList.addAll(expectedBinlogDataThisRound);
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
}
jobClient.cancel().get();
}
}
private String getCreateTableStatement(String... captureTableNames) {
return format(
"CREATE TABLE address ("
+ " table_name STRING METADATA VIRTUAL,"
+ " id BIGINT NOT NULL,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'server-id' = '%s',"
+ " 'scan.newly-added-table.enabled' = 'true'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getTableNameRegex(captureTableNames),
getServerId());
}
private StreamExecutionEnvironment getStreamExecutionEnvironment(
String finishedSavePointPath, int parallelism) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (finishedSavePointPath != null) {
// restore from savepoint
// hack for test to visit protected TestStreamEnvironment#getConfiguration() method
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> clazz =
classLoader.loadClass(
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
Method getConfigurationMethod = clazz.getDeclaredMethod("getConfiguration");
getConfigurationMethod.setAccessible(true);
Configuration configuration = (Configuration) getConfigurationMethod.invoke(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
}
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
return env;
}
private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory)
throws ExecutionException, InterruptedException {
int retryTimes = 0;
// retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute
while (retryTimes < 600) {
try {
return jobClient.triggerSavepoint(savepointDirectory).get();
} catch (Exception e) {
Optional<CheckpointException> exception =
ExceptionUtils.findThrowable(e, CheckpointException.class);
if (exception.isPresent()
&& exception.get().getMessage().contains("Checkpoint triggering task")) {
Thread.sleep(100);
retryTimes++;
} else {
throw e;
}
}
}
return null;
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
@ -346,13 +492,13 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
return rows;
}
private String getTableName(String[] captureCustomerTables) {
private String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
return captureCustomerTables[0];
} else {
// pattern that matches multiple tables
return String.format("(%s)", StringUtils.join(captureCustomerTables, "|"));
return format("(%s)", StringUtils.join(captureCustomerTables, "|"));
}
}
@ -408,56 +554,66 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
}
private void createNewTableAndInsertData(JdbcConnection connection, String tableId)
private void initialAddressTables(JdbcConnection connection, String[] addressTables)
throws SQLException {
try {
connection.setAutoCommit(false);
connection.execute(
"CREATE TABLE "
+ tableId
+ "("
+ "id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
+ "country VARCHAR(255) NOT NULL,"
+ "city VARCHAR(255) NOT NULL,"
+ "detail_address VARCHAR(1024))");
for (String tableName : addressTables) {
// make initial data for given table
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
"CREATE TABLE "
+ tableId
+ "("
+ " id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
+ " country VARCHAR(255) NOT NULL,"
+ " city VARCHAR(255) NOT NULL,"
+ " detail_address VARCHAR(1024)"
+ ");");
connection.execute(
format(
"INSERT INTO %s "
+ "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),"
+ " (416927583791428523, 'China', '%s', '%s West Town address 2'),"
+ " (417022095255614379, 'China', '%s', '%s West Town address 3');",
tableId, cityName, cityName, cityName, cityName, cityName,
cityName));
}
connection.commit();
} finally {
connection.close();
}
}
// insert some data
private void makeFirstPartBinlogForAddressTable(JdbcConnection connection, String tableName)
throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events for the first split
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(416874195632735147, 'China', 'Beijing', 'West Town address 1'),"
+ "(416927583791428523, 'China', 'Beijing', 'West Town address 2'),"
+ "(417022095255614379, 'China', 'Beijing', 'West Town address 3'),"
+ "(417111867899200427, 'America', 'New York', 'East Town address 1'),"
+ "(417271541558096811, 'America', 'New York', 'East Town address 2'),"
+ "(417272886855938987, 'America', 'New York', 'East Town address 3'),"
+ "(417420106184475563, 'Germany', 'Berlin', 'West Town address 1'),"
+ "(418161258277847979, 'Germany', 'Berlin', 'West Town address 2')");
format(
"UPDATE %s SET COUNTRY = 'CHINA' where id = 416874195632735147",
tableId));
connection.commit();
} finally {
connection.close();
}
}
private void insertDataToGenerateSomeBinlog(JdbcConnection connection, String tableId)
private void makeSecondPartBinlogForAddressTable(JdbcConnection connection, String tableName)
throws SQLException {
try {
connection.setAutoCommit(false);
// insert some data
// make binlog events for the second split
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(826874195632735147, 'China', 'Beijing', 'West Town address 1'),"
+ "(826927583791428523, 'China', 'Beijing', 'West Town address 2'),"
+ "(827022095255614379, 'China', 'Beijing', 'West Town address 3'),"
+ "(827111867899200427, 'America', 'New York', 'East Town address 1'),"
+ "(827271541558096811, 'America', 'New York', 'East Town address 2'),"
+ "(827272886855938987, 'America', 'New York', 'East Town address 3'),"
+ "(827420106184475563, 'Germany', 'Berlin', 'West Town address 1'),"
+ "(828161258277847979, 'Germany', 'Berlin', 'West Town address 2')");
format(
"INSERT INTO %s VALUES(417022095255614380, 'China','%s','%s West Town address 4')",
tableId, cityName, cityName));
connection.commit();
} finally {
connection.close();
@ -525,4 +681,22 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
afterFailAction.run();
miniCluster.startTaskManager();
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}

@ -239,6 +239,7 @@ public class MySqlTableSourceFactoryTest {
options.put("port", "3307");
options.put("server-id", "4321");
options.put("server-time-zone", "Asia/Shanghai");
options.put("scan.newly-added-table.enabled", "true");
options.put("debezium.snapshot.mode", "never");
DynamicTableSource actualSource = createTableSource(options);
@ -265,7 +266,8 @@ public class MySqlTableSourceFactoryTest {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
true);
assertEquals(expectedSource, actualSource);
}

Loading…
Cancel
Save