|
|
|
@ -63,7 +63,7 @@ import java.util.Map;
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER;
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.BINLOG_READER;
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
|
|
|
|
@ -246,9 +246,7 @@ public class MySqlSourceReader<T>
|
|
|
|
|
mySqlSourceReaderContext.setStopBinlogSplitReader();
|
|
|
|
|
} else if (sourceEvent instanceof WakeupReaderEvent) {
|
|
|
|
|
WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent;
|
|
|
|
|
if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) {
|
|
|
|
|
context.sendSplitRequest();
|
|
|
|
|
} else {
|
|
|
|
|
if (wakeupReaderEvent.getTarget() == BINLOG_READER) {
|
|
|
|
|
if (suspendedBinlogSplit != null) {
|
|
|
|
|
context.sendSourceEventToCoordinator(
|
|
|
|
|
new LatestFinishedSplitsSizeRequestEvent());
|
|
|
|
|