|
|
|
@ -276,9 +276,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Optional<MySqlSplit> getNext() {
|
|
|
|
|
checkSplitterErrors();
|
|
|
|
|
waitTableDiscoveryReady();
|
|
|
|
|
synchronized (lock) {
|
|
|
|
|
checkSplitterErrors();
|
|
|
|
|
if (!remainingSplits.isEmpty()) {
|
|
|
|
|
// return remaining splits firstly
|
|
|
|
|
Iterator<MySqlSchemalessSnapshotSplit> iterator = remainingSplits.iterator();
|
|
|
|
@ -496,7 +496,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
|
|
|
|
|
|
|
|
|
|
private void splitChunksForRemainingTables() {
|
|
|
|
|
try {
|
|
|
|
|
// restore from a checkpoint and start to split the table from the previous checkpoint
|
|
|
|
|
// restore from a checkpoint and start to split the table from the previous
|
|
|
|
|
// checkpoint
|
|
|
|
|
if (chunkSplitter.hasNextChunk()) {
|
|
|
|
|
LOG.info(
|
|
|
|
|
"Start splitting remaining chunks for table {}",
|
|
|
|
@ -509,13 +510,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
|
|
|
|
|
splitTable(nextTable);
|
|
|
|
|
}
|
|
|
|
|
} catch (Throwable e) {
|
|
|
|
|
if (uncaughtSplitterException == null) {
|
|
|
|
|
uncaughtSplitterException = e;
|
|
|
|
|
} else {
|
|
|
|
|
uncaughtSplitterException.addSuppressed(e);
|
|
|
|
|
}
|
|
|
|
|
// Release the potential waiting getNext() call
|
|
|
|
|
synchronized (lock) {
|
|
|
|
|
if (uncaughtSplitterException == null) {
|
|
|
|
|
uncaughtSplitterException = e;
|
|
|
|
|
} else {
|
|
|
|
|
uncaughtSplitterException.addSuppressed(e);
|
|
|
|
|
}
|
|
|
|
|
// Release the potential waiting getNext() call
|
|
|
|
|
lock.notify();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|