[mysql] Support binlog reader merge optimization

This close 
pull/263/head
Leonard Xu committed by GitHub
parent 6231e990c0
commit 5786506394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -70,6 +70,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
private MySQLSplit currentTableSplit; private MySQLSplit currentTableSplit;
// tableId -> List[splitKeyStart, splitKeyEnd, splitHighWatermark] // tableId -> List[splitKeyStart, splitKeyEnd, splitHighWatermark]
private Map<TableId, List<Tuple3<Object[], Object[], BinlogPosition>>> finishedSplitsInfo; private Map<TableId, List<Tuple3<Object[], Object[], BinlogPosition>>> finishedSplitsInfo;
// tableId -> the max splitHighWatermark
private Map<TableId, BinlogPosition> maxSplitHighWatermarkMap;
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
this.statefulTaskContext = statefulTaskContext; this.statefulTaskContext = statefulTaskContext;
@ -149,6 +151,20 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
return sourceRecords.iterator(); return sourceRecords.iterator();
} }
@Override
public void close() {
try {
if (statefulTaskContext.getConnection() != null) {
statefulTaskContext.getConnection().close();
}
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
} catch (Exception e) {
LOGGER.error("Close binlog reader error", e);
}
}
/** /**
* Returns the record should emit or not. * Returns the record should emit or not.
* *
@ -167,12 +183,16 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
private boolean shouldEmit(SourceRecord sourceRecord) { private boolean shouldEmit(SourceRecord sourceRecord) {
if (isDataChangeRecord(sourceRecord)) { if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord); TableId tableId = getTableId(sourceRecord);
BinlogPosition position = getBinlogPosition(sourceRecord);
// aligned, all snapshot splits of the table has reached max highWatermark
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
}
Object[] key = Object[] key =
getSplitKey( getSplitKey(
currentTableSplit.getSplitBoundaryType(), currentTableSplit.getSplitBoundaryType(),
sourceRecord, sourceRecord,
statefulTaskContext.getSchemaNameAdjuster()); statefulTaskContext.getSchemaNameAdjuster());
BinlogPosition position = getBinlogPosition(sourceRecord);
for (Tuple3<Object[], Object[], BinlogPosition> splitInfo : for (Tuple3<Object[], Object[], BinlogPosition> splitInfo :
finishedSplitsInfo.get(tableId)) { finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(key, splitInfo.f0, splitInfo.f1) if (RecordUtils.splitKeyRangeContains(key, splitInfo.f0, splitInfo.f1)
@ -193,6 +213,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
currentTableSplit.getFinishedSplitsInfo(); currentTableSplit.getFinishedSplitsInfo();
Map<TableId, List<Tuple3<Object[], Object[], BinlogPosition>>> splitsInfoMap = Map<TableId, List<Tuple3<Object[], Object[], BinlogPosition>>> splitsInfoMap =
new HashMap<>(); new HashMap<>();
Map<TableId, BinlogPosition> tableIdBinlogPositionMap = new HashMap<>();
for (Tuple5<TableId, String, Object[], Object[], BinlogPosition> finishedSplitInfo : for (Tuple5<TableId, String, Object[], Object[], BinlogPosition> finishedSplitInfo :
finishedSplitsInfo) { finishedSplitsInfo) {
@ -201,7 +222,14 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
splitsInfoMap.getOrDefault(tableId, new ArrayList<>()); splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
list.add(Tuple3.of(finishedSplitInfo.f2, finishedSplitInfo.f3, finishedSplitInfo.f4)); list.add(Tuple3.of(finishedSplitInfo.f2, finishedSplitInfo.f3, finishedSplitInfo.f4));
splitsInfoMap.put(tableId, list); splitsInfoMap.put(tableId, list);
BinlogPosition highWatermark = finishedSplitInfo.f4;
BinlogPosition maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
if (maxHighWatermark == null || highWatermark.isAtOrBefore(maxHighWatermark)) {
tableIdBinlogPositionMap.put(tableId, highWatermark);
}
} }
this.finishedSplitsInfo = splitsInfoMap; this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
} }
} }

@ -35,6 +35,9 @@ public interface DebeziumReader<T, Split> {
*/ */
void submitSplit(Split splitToRead); void submitSplit(Split splitToRead);
/** Close the reader and releases all resources. */
void close();
/** /**
* Reads records from MySQL. The method should return null when reaching the end of the split, * Reads records from MySQL. The method should return null when reaching the end of the split,
* the empty {@link Iterator} will be returned if the data of split is on pulling. * the empty {@link Iterator} will be returned if the data of split is on pulling.

@ -188,6 +188,20 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySQLSp
return null; return null;
} }
@Override
public void close() {
try {
if (statefulTaskContext.getConnection() != null) {
statefulTaskContext.getConnection().close();
}
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
} catch (Exception e) {
LOGGER.error("Close snapshot reader error", e);
}
}
/** /**
* {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high * {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high
* watermark for each {@link MySQLSplit}. * watermark for each {@link MySQLSplit}.

@ -40,6 +40,7 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -77,7 +78,7 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
// when the MySQLSourceEnumerator restore, it may missed some report information from reader // when the MySQLSourceEnumerator restore, it may missed some report information from reader
// tell all readers what we have received and request readers report their finished splits // tell all readers what we have received and request readers report their finished splits
notifyReaderReceivedFinishedSplits(assignedSplits.keySet().toArray(new Integer[0])); notifyReaderReceivedFinishedSplits(assignedSplits.keySet().toArray(new Integer[0]));
notifyReaderReportFinishedSplits(assignedSplits.keySet().toArray(new Integer[0])); notifyReaderReportFinishedSplitsIfNeed(assignedSplits.keySet().toArray(new Integer[0]));
} }
@Override @Override
@ -86,34 +87,46 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
// assign snapshot split firstly // assign snapshot split firstly
if (split.isPresent()) { if (split.isPresent()) {
context.assignSplit(split.get(), subtaskId); context.assignSplit(split.get(), subtaskId);
// record assigned splits // record assigned splits
recordAssignedSplits(split.get(), subtaskId); recordAssignedSplits(split.get(), subtaskId);
LOGGER.info("Assign snapshot split {} for subtask {}", split.get(), subtaskId); LOGGER.info("Assign snapshot split {} for subtask {}", split.get(), subtaskId);
} return;
// no more snapshot split, try notify no more splits
else if (couldNotifyNoMoreSplits(subtaskId)) {
context.signalNoMoreSplits(subtaskId);
}
// no more snapshot split, try assign binlog split
else if (couldAssignBinlogSplit(subtaskId)) {
assignBinlogSplit(subtaskId);
LOGGER.info("Assign binlog split for subtask {}", subtaskId);
} else if (hasAssignedBinlogSplit(subtaskId)) {
context.signalNoMoreSplits(subtaskId);
LOGGER.info("No available split for subtask {}", subtaskId);
} else { } else {
// the binlog split can not assign due to snapshot splits report is incomplete // no more snapshot split, try assign binlog split
// tell reader report finished snapshot splits if (couldAssignBinlogSplit()) {
notifyReaderReportFinishedSplits(new Integer[] {subtaskId}); assignBinlogSplit(subtaskId);
LOGGER.info("Assign binlog split for subtask {}", subtaskId);
return;
}
// no more snapshot split, try notify no more splits
else if (couldNotifyNoMoreSplits(subtaskId)) {
context.signalNoMoreSplits(subtaskId);
LOGGER.info("No available split for subtask {}", subtaskId);
return;
}
// the binlog split may can not assign due to snapshot splits report is
// incomplete, tell reader report finished snapshot splits
notifyReaderReportFinishedSplitsIfNeed(new Integer[] {subtaskId});
} }
} }
private void notifyReaderReportFinishedSplits(Integer[] subtaskIds) { private void notifyReaderReportFinishedSplitsIfNeed(Integer[] subtaskIds) {
// call reader report finished snapshot // call reader report finished snapshot
for (int subtaskId : subtaskIds) { for (int subtaskId : subtaskIds) {
context.sendEventToSourceReader(subtaskId, new EnumeratorRequestReportEvent()); final List<MySQLSplit> assignedSplit =
LOGGER.info("The enumerator call subtask {} to report its finished splits.", subtaskId); assignedSplits.getOrDefault(subtaskId, new ArrayList<>());
final List<Tuple2<String, BinlogPosition>> ackSpitsForReader =
receiveFinishedSnapshotSplits.getOrDefault(subtaskId, new ArrayList<>());
int assignedSnapshotSplitSize =
assignedSplit.stream()
.filter(sqlSplit -> sqlSplit.getSplitKind() == MySQLSplitKind.SNAPSHOT)
.collect(Collectors.toList())
.size();
if (assignedSnapshotSplitSize > ackSpitsForReader.size()) {
context.sendEventToSourceReader(subtaskId, new EnumeratorRequestReportEvent());
LOGGER.info(
"The enumerator call subtask {} to report its finished splits.", subtaskId);
}
} }
} }
@ -172,43 +185,49 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
} }
private boolean couldNotifyNoMoreSplits(int subtaskId) { private boolean couldNotifyNoMoreSplits(int subtaskId) {
// the task may never be assigned split
final List<MySQLSplit> assignedSplit = assignedSplits.get(subtaskId); final List<MySQLSplit> assignedSplit = assignedSplits.get(subtaskId);
if (assignedSplit == null) { if (assignedSplit == null || hasAssignedBinlogSplit()) {
return true; return true;
} else { } else {
return false; return false;
} }
} }
private boolean hasAssignedBinlogSplit(int subtaskId) { private boolean hasAssignedBinlogSplit() {
final List<MySQLSplit> assignedSplit = assignedSplits.get(subtaskId); for (List<MySQLSplit> assignedSplit : assignedSplits.values()) {
if (assignedSplit != null) { if (assignedSplit != null) {
return assignedSplit.stream() return assignedSplit.stream()
.anyMatch(r -> r.getSplitKind().equals(MySQLSplitKind.BINLOG)); .anyMatch(r -> r.getSplitKind().equals(MySQLSplitKind.BINLOG));
} else { }
return false;
} }
return false;
} }
private boolean couldAssignBinlogSplit(int subtaskId) { private boolean couldAssignBinlogSplit() {
final List<MySQLSplit> assignedSplit = assignedSplits.get(subtaskId); final int assignedSnapshotSplit =
final List<Tuple2<String, BinlogPosition>> receiveSnapshotSplits = assignedSplits.values().stream()
receiveFinishedSnapshotSplits.get(subtaskId); .flatMap(Collection::stream)
if (assignedSplit != null .collect(Collectors.toList())
&& receiveSnapshotSplits != null .size();
&& assignedSplit.size() == receiveSnapshotSplits.size()) { final int receiveSnapshotSplits =
return true; receiveFinishedSnapshotSplits.values().stream()
} .flatMap(Collection::stream)
return false; .collect(Collectors.toList())
.size();
// All assigned snapshot splits have finished
return assignedSnapshotSplit == receiveSnapshotSplits;
} }
private void assignBinlogSplit(int subtaskId) { private void assignBinlogSplit(int requestTaskId) {
final List<MySQLSplit> assignedSnapshotSplit = final List<MySQLSplit> assignedSnapshotSplit =
assignedSplits.get(subtaskId).stream() assignedSplits.values().stream()
.flatMap(Collection::stream)
.sorted(Comparator.comparing(MySQLSplit::splitId)) .sorted(Comparator.comparing(MySQLSplit::splitId))
.collect(Collectors.toList()); .collect(Collectors.toList());
final List<Tuple2<String, BinlogPosition>> receiveSnapshotSplits = final List<Tuple2<String, BinlogPosition>> receiveSnapshotSplits =
receiveFinishedSnapshotSplits.get(subtaskId).stream() receiveFinishedSnapshotSplits.values().stream()
.flatMap(Collection::stream)
.sorted(Comparator.comparing(o -> o.f0)) .sorted(Comparator.comparing(o -> o.f0))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -240,7 +259,7 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
new MySQLSplit( new MySQLSplit(
MySQLSplitKind.BINLOG, MySQLSplitKind.BINLOG,
lastSnapshotSplit.getTableId(), lastSnapshotSplit.getTableId(),
"binlog-split-" + subtaskId, "binlog-split-" + requestTaskId,
lastSnapshotSplit.getSplitBoundaryType(), lastSnapshotSplit.getSplitBoundaryType(),
null, null,
null, null,
@ -251,9 +270,9 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
snapshotSplits, snapshotSplits,
databaseHistory); databaseHistory);
// assign // assign
context.assignSplit(binlogSplit, subtaskId); context.assignSplit(binlogSplit, requestTaskId);
// record assigned splits // record assigned splits
recordAssignedSplits(binlogSplit, subtaskId); recordAssignedSplits(binlogSplit, requestTaskId);
} }
private void recordAssignedSplits(MySQLSplit split, int subtaskId) { private void recordAssignedSplits(MySQLSplit split, int subtaskId) {

@ -94,13 +94,22 @@ public class MySQLSplitReader implements SplitReader<SourceRecord, MySQLSplit> {
public void wakeUp() {} public void wakeUp() {}
@Override @Override
public void close() throws Exception {} public void close() throws Exception {
if (currentReader != null) {
LOGGER.info(
"Close current debezium reader {}",
currentReader.getClass().getCanonicalName());
currentReader.close();
currentSplitId = null;
}
}
private void checkSplitOrStartNext() throws IOException { private void checkSplitOrStartNext() throws IOException {
// the binlog reader should keep alive // the binlog reader should keep alive
if (currentReader != null && currentReader instanceof BinlogSplitReader) { if (currentReader != null && currentReader instanceof BinlogSplitReader) {
return; return;
} }
if (canAssignNextSplit()) { if (canAssignNextSplit()) {
final MySQLSplit nextSplit = splits.poll(); final MySQLSplit nextSplit = splits.poll();
if (nextSplit == null) { if (nextSplit == null) {
@ -119,10 +128,15 @@ public class MySQLSplitReader implements SplitReader<SourceRecord, MySQLSplit> {
currentReader.submitSplit(nextSplit); currentReader.submitSplit(nextSplit);
} else { } else {
// point from snapshot split to binlog split // point from snapshot split to binlog split
if (currentReader != null) {
LOGGER.info("It's turn to read binlog split, close current snapshot reader");
currentReader.close();
}
final MySqlConnection jdbcConnection = getConnection(config); final MySqlConnection jdbcConnection = getConnection(config);
final BinaryLogClient binaryLogClient = getBinaryClient(config); final BinaryLogClient binaryLogClient = getBinaryClient(config);
final StatefulTaskContext statefulTaskContext = final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(config, binaryLogClient, jdbcConnection); new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
LOGGER.info("Create binlog reader");
currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId); currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
currentReader.submitSplit(nextSplit); currentReader.submitSplit(nextSplit);
} }

@ -112,7 +112,14 @@ public class BinlogSplitReaderTest extends MySQLTestBase {
List<MySQLSplit> splits = getMySQLSplits(configuration, pkType); List<MySQLSplit> splits = getMySQLSplits(configuration, pkType);
String[] expected = String[] expected =
useIntegralTypeOptimization useIntegralTypeOptimization
? new String[] {} ? new String[] {
"+U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]"
}
: new String[] { : new String[] {
"+I[101, user_1, Shanghai, 123567891234]", "+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]",

Loading…
Cancel
Save