[mysql] Split snapshot chunks asynchronously (#931)

pull/960/head
ehui 3 years ago committed by GitHub
parent 7b16dc2b98
commit 41a5463437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,8 @@ package com.ververica.cdc.connectors.mysql.source.assigners;
import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema; import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
@ -42,10 +44,13 @@ import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
@ -68,13 +73,16 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final Map<String, BinlogOffset> splitFinishedOffsets; private final Map<String, BinlogOffset> splitFinishedOffsets;
private final MySqlSourceConfig sourceConfig; private final MySqlSourceConfig sourceConfig;
private final int currentParallelism; private final int currentParallelism;
private final LinkedList<TableId> remainingTables; private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed; private final boolean isRemainingTablesCheckpointed;
private AssignerStatus assignerStatus; private AssignerStatus assignerStatus;
private ChunkSplitter chunkSplitter; private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive; private boolean isTableIdCaseSensitive;
private ExecutorService executor;
private Object lock;
@Nullable private Long checkpointIdToFinish; @Nullable private Long checkpointIdToFinish;
public MySqlSnapshotSplitAssigner( public MySqlSnapshotSplitAssigner(
@ -126,17 +134,18 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
this.sourceConfig = sourceConfig; this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism; this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables; this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits; this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
this.assignedSplits = assignedSplits; this.assignedSplits = assignedSplits;
this.splitFinishedOffsets = splitFinishedOffsets; this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus; this.assignerStatus = assignerStatus;
this.remainingTables = new LinkedList<>(remainingTables); this.remainingTables = new CopyOnWriteArrayList<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isTableIdCaseSensitive = isTableIdCaseSensitive;
} }
@Override @Override
public void open() { public void open() {
lock = new Object();
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive); chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive);
// the legacy state didn't snapshot remaining tables, discovery remaining table here // the legacy state didn't snapshot remaining tables, discovery remaining table here
@ -152,6 +161,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
} }
} }
captureNewlyAddedTables(); captureNewlyAddedTables();
startAsynchronouslySplit();
} }
private void captureNewlyAddedTables() { private void captureNewlyAddedTables() {
@ -180,25 +190,54 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
} }
} }
private void startAsynchronouslySplit() {
if (!remainingTables.isEmpty()) {
if (executor == null) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}
executor.submit(
() -> {
Iterator<TableId> iterator = remainingTables.iterator();
while (iterator.hasNext()) {
TableId nextTable = iterator.next();
// split the given table into chunks (snapshot splits)
Collection<MySqlSnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
synchronized (lock) {
remainingSplits.addAll(splits);
remainingTables.remove(nextTable);
lock.notify();
}
}
});
}
}
@Override @Override
public Optional<MySqlSplit> getNext() { public Optional<MySqlSplit> getNext() {
if (!remainingSplits.isEmpty()) { synchronized (lock) {
// return remaining splits firstly if (!remainingSplits.isEmpty()) {
Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator(); // return remaining splits firstly
MySqlSnapshotSplit split = iterator.next(); Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
iterator.remove(); MySqlSnapshotSplit split = iterator.next();
assignedSplits.put(split.splitId(), split); remainingSplits.remove(split);
return Optional.of(split); assignedSplits.put(split.splitId(), split);
} else { addAlreadyProcessedTablesIfNotExists(split.getTableId());
// it's turn for next table return Optional.of(split);
TableId nextTable = remainingTables.pollFirst(); } else if (!remainingTables.isEmpty()) {
if (nextTable != null) { try {
// split the given table into chunks (snapshot splits) // wait for the asynchronous split to complete
Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable); lock.wait();
remainingSplits.addAll(splits); } catch (InterruptedException e) {
alreadyProcessedTables.add(nextTable); throw new FlinkRuntimeException(
"InterruptedException while waiting for asynchronously snapshot split");
}
return getNext(); return getNext();
} else { } else {
closeExecutorService();
return Optional.empty(); return Optional.empty();
} }
} }
@ -319,7 +358,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
} }
@Override @Override
public void close() {} public void close() {
closeExecutorService();
}
private void closeExecutorService() {
if (executor != null) {
executor.shutdown();
}
}
private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
if (!alreadyProcessedTables.contains(tableId)) {
alreadyProcessedTables.add(tableId);
}
}
/** Indicates there is no more splits available in this assigner. */ /** Indicates there is no more splits available in this assigner. */
public boolean noMoreSplits() { public boolean noMoreSplits() {

Loading…
Cancel
Save