[FLINK-34688][cdc-connector][mysql] Make scan newly table trigger condition strictly

This closes #3519.
pull/1709/head
Hongshun Wang 6 months ago committed by GitHub
parent 07446d1f9d
commit 4bf5a395a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -203,7 +203,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled()
&& !sourceConfig.getStartupOptions().isSnapshotOnly()) {
&& !sourceConfig.getStartupOptions().isSnapshotOnly()
&& AssignerStatus.isAssigningFinished(assignerStatus)) {
try {
// check whether we got newly added tables
final List<TableId> currentCapturedTables =

@ -215,7 +215,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private void captureNewlyAddedTables() {
// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
&& !sourceConfig.getStartupOptions().isSnapshotOnly()) {
&& !sourceConfig.getStartupOptions().isSnapshotOnly()
&& AssignerStatus.isAssigningFinished(assignerStatus)) {
// check whether we got newly added tables
try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
final List<TableId> currentCapturedTables =

@ -484,7 +484,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
}
@Test
public void testScanNewlyAddedTableStartFromCheckpoint() {
public void testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() {
List<String> expected =
Arrays.asList(
"customers_sparse_dist [109] null",
@ -492,7 +492,24 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
"customers_even_dist [10] [18]",
"customers_even_dist [18] null",
"customer_card_single_line null null");
assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint());
assertEquals(
expected,
getTestAssignSnapshotSplitsFromCheckpoint(
AssignerStatus.INITIAL_ASSIGNING_FINISHED));
}
@Test
public void testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint() {
List<String> expected =
Arrays.asList(
"customers_sparse_dist [109] null",
"customers_even_dist null [10]",
"customers_even_dist [10] [18]",
"customers_even_dist [18] null");
assertEquals(
expected,
getTestAssignSnapshotSplitsFromCheckpoint(
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED));
}
private List<String> getTestAssignSnapshotSplits(
@ -536,7 +553,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
return getSplitsFromAssigner(assigner);
}
private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
private List<String> getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) {
TableId newTable =
TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line");
TableId processedTable =
@ -619,7 +636,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
assignedSplits,
new HashMap<>(),
splitFinishedOffsets,
AssignerStatus.INITIAL_ASSIGNING,
assignerStatus,
remainingTables,
false,
true,

Loading…
Cancel
Save