|
|
|
@ -106,7 +106,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
|
|
|
|
|
LOG.debug("MySQL Source Enumerator adds splits back: {}", splits);
|
|
|
|
|
LOG.debug("The enumerator adds splits back: {}", splits);
|
|
|
|
|
splitAssigner.addSplits(splits);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -123,7 +123,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
|
|
|
|
|
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
|
|
|
|
|
if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) {
|
|
|
|
|
LOG.info(
|
|
|
|
|
"The enumerator receives finished split offsets {} from subtask {}.",
|
|
|
|
|
"The enumerator under {} receives finished split offsets {} from subtask {}.",
|
|
|
|
|
splitAssigner.getAssignerStatus(),
|
|
|
|
|
sourceEvent,
|
|
|
|
|
subtaskId);
|
|
|
|
|
FinishedSnapshotSplitsReportEvent reportEvent =
|
|
|
|
@ -200,7 +201,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
|
|
|
|
|
final MySqlSplit mySqlSplit = split.get();
|
|
|
|
|
context.assignSplit(mySqlSplit, nextAwaiting);
|
|
|
|
|
awaitingReader.remove();
|
|
|
|
|
LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
|
|
|
|
|
LOG.info("The enumerator assigns split {} to subtask {}", mySqlSplit, nextAwaiting);
|
|
|
|
|
} else {
|
|
|
|
|
// there is no available splits by now, skip assigning
|
|
|
|
|
requestBinlogSplitUpdateIfNeed();
|
|
|
|
@ -235,6 +236,9 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
|
|
|
|
|
private void requestBinlogSplitUpdateIfNeed() {
|
|
|
|
|
if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
|
|
|
|
|
for (int subtaskId : getRegisteredReader()) {
|
|
|
|
|
LOG.info(
|
|
|
|
|
"The enumerator requests subtask {} to update the binlog split after newly added table.",
|
|
|
|
|
subtaskId);
|
|
|
|
|
context.sendEventToSourceReader(subtaskId, new BinlogSplitUpdateRequestEvent());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -247,9 +251,9 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
|
|
|
|
|
splitAssigner.getFinishedSplitInfos();
|
|
|
|
|
if (finishedSnapshotSplitInfos.isEmpty()) {
|
|
|
|
|
LOG.error(
|
|
|
|
|
"The assigner offer empty finished split information, this should not happen");
|
|
|
|
|
"The assigner offers empty finished split information, this should not happen");
|
|
|
|
|
throw new FlinkRuntimeException(
|
|
|
|
|
"The assigner offer empty finished split information, this should not happen");
|
|
|
|
|
"The assigner offers empty finished split information, this should not happen");
|
|
|
|
|
}
|
|
|
|
|
binlogSplitMeta =
|
|
|
|
|
Lists.partition(
|
|
|
|
@ -269,7 +273,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
|
|
|
|
|
context.sendEventToSourceReader(subTask, metadataEvent);
|
|
|
|
|
} else {
|
|
|
|
|
LOG.error(
|
|
|
|
|
"Received invalid request meta group id {}, the valid meta group id range is [0, {}]",
|
|
|
|
|
"The enumerator received invalid request meta group id {}, the valid meta group id range is [0, {}]",
|
|
|
|
|
requestMetaGroupId,
|
|
|
|
|
binlogSplitMeta.size() - 1);
|
|
|
|
|
}
|
|
|
|
|