[FLINK-36326][source-connector/mysql] Fix auto scan newly-added table failure

This closes #3661.

Co-authored-by: Hang Ruan <ruanhang1993@hotmail.com>
pull/3737/head
yuxiqian 2 months ago committed by GitHub
parent 40db6d5eb7
commit 908949bc72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -81,6 +81,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
@Nullable private Integer binlogSplitTaskId; @Nullable private Integer binlogSplitTaskId;
private boolean isBinlogSplitUpdateRequestAlreadySent = false;
public MySqlSourceEnumerator( public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context, SplitEnumeratorContext<MySqlSplit> context,
MySqlSourceConfig sourceConfig, MySqlSourceConfig sourceConfig,
@ -273,7 +275,9 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
} }
private void requestBinlogSplitUpdateIfNeed() { private void requestBinlogSplitUpdateIfNeed() {
if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { if (!isBinlogSplitUpdateRequestAlreadySent
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
isBinlogSplitUpdateRequestAlreadySent = true;
for (int subtaskId : getRegisteredReader()) { for (int subtaskId : getRegisteredReader()) {
LOG.info( LOG.info(
"The enumerator requests subtask {} to update the binlog split after newly added table.", "The enumerator requests subtask {} to update the binlog split after newly added table.",

@ -78,6 +78,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.lang.String.format; import static java.lang.String.format;
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkState;
/** IT tests to cover various newly added tables during capture process. */ /** IT tests to cover various newly added tables during capture process. */
@ -511,6 +512,12 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
temporaryFolder.delete(); temporaryFolder.delete();
} }
@Test
public void testNewlyAddedEmptyTableAndInsertAfterJobStart() throws Exception {
testNewlyAddedTableOneByOneWithCreateBeforeStart(
1, new HashMap<>(), "address_hangzhou", "address_beijing");
}
/** Add a collect sink in the job. */ /** Add a collect sink in the job. */
protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) { protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) {
TypeSerializer<RowData> serializer = TypeSerializer<RowData> serializer =
@ -1108,4 +1115,144 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
} }
} }
} }
private void testNewlyAddedTableOneByOneWithCreateBeforeStart(
int parallelism, Map<String, String> sourceOptions, String... captureAddressTables)
throws Exception {
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
String finishedSavePointPath = null;
List<String> fetchedDataList = new ArrayList<>();
for (int round = 0; round < captureAddressTables.length; round++) {
boolean insertData = round == 0;
initialAddressTables(getConnection(), captureAddressTables, round, insertData);
String[] captureTablesThisRound =
Arrays.asList(captureAddressTables)
.subList(0, round + 1)
.toArray(new String[0]);
String newlyAddedTable = captureAddressTables[round];
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
env.setRestartStrategy(noRestart());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(sourceOptions, captureTablesThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " table_name STRING,"
+ " id BIGINT,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (city, id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
Thread.sleep(3_000);
String tableName = captureAddressTables[round];
if (!insertData) {
insertData(
getConnection(),
customDatabase.getDatabaseName() + "." + tableName,
tableName.split("_")[1]);
}
// step 2: assert fetched snapshot data in this round
String cityName = newlyAddedTable.split("_")[1];
List<String> expectedSnapshotDataThisRound =
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName));
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
// step 3: make some binlog data for this round
makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable);
makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable);
// step 4: assert fetched binlog data in this round
// retract the old data with id 416874195632735147
fetchedDataList =
fetchedDataList.stream()
.filter(
r ->
!r.contains(
format(
"%s, 416874195632735147",
newlyAddedTable)))
.collect(Collectors.toList());
List<String> expectedBinlogUpsertDataThisRound =
Arrays.asList(
// add the new data with id 416874195632735147
format(
"+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614380, China, %s, %s West Town address 4]",
newlyAddedTable, cityName, cityName));
// step 5: assert fetched binlog data in this round
fetchedDataList.addAll(expectedBinlogUpsertDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
}
jobClient.cancel().get();
}
}
private void initialAddressTables(
JdbcConnection connection, String[] addressTables, int round, boolean insertData)
throws SQLException {
try {
connection.setAutoCommit(false);
String tableName = addressTables[round];
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
"CREATE TABLE "
+ tableId
+ "("
+ " id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
+ " country VARCHAR(255) NOT NULL,"
+ " city VARCHAR(255) NOT NULL,"
+ " detail_address VARCHAR(1024)"
+ ");");
if (insertData) {
insertData(connection, tableId, cityName);
}
connection.commit();
} finally {
connection.close();
}
}
private void insertData(JdbcConnection connection, String tableId, String cityName)
throws SQLException {
try {
connection.execute(
format(
"INSERT INTO %s "
+ "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),"
+ " (416927583791428523, 'China', '%s', '%s West Town address 2'),"
+ " (417022095255614379, 'China', '%s', '%s West Town address 3');",
tableId, cityName, cityName, cityName, cityName, cityName, cityName));
} finally {
connection.close();
}
}
} }

Loading…
Cancel
Save