[mysql] Use the gtid set firstly when compare the binlog offset (#761)

Co-authored-by: camelusluo <camelusluo@tencent.com>
pull/1116/head
camelus 3 years ago committed by GitHub
parent 7efdc67c80
commit c0aca57a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -28,6 +28,7 @@ import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
@ -191,6 +192,59 @@ public class StatefulTaskContext {
}
private boolean isBinlogAvailable(MySqlOffsetContext offset) {
String gtidStr = offset.gtidSet();
if (gtidStr != null) {
return checkGtidSet(offset);
}
return checkBinlogFilename(offset);
}
private boolean checkGtidSet(MySqlOffsetContext offset) {
String gtidStr = offset.gtidSet();
if (gtidStr.trim().isEmpty()) {
return true; // start at beginning ...
}
String availableGtidStr = connection.knownGtidSet();
if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
// Last offsets had GTIDs but the server does not use them ...
LOG.warn(
"Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
return false;
}
// GTIDs are enabled
GtidSet gtidSet = new GtidSet(gtidStr);
// Get the GTID set that is available in the server ...
GtidSet availableGtidSet = new GtidSet(availableGtidStr);
if (gtidSet.isContainedWithin(availableGtidSet)) {
LOG.info(
"MySQL current GTID set {} does contain the GTID set {} required by the connector.",
availableGtidSet,
gtidSet);
// The replication is concept of mysql master-slave replication protocol ...
final GtidSet gtidSetToReplicate =
connection.subtractGtidSet(availableGtidSet, gtidSet);
final GtidSet purgedGtidSet = connection.purgedGtidSet();
LOG.info("Server has already purged {} GTIDs", purgedGtidSet);
final GtidSet nonPurgedGtidSetToReplicate =
connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
LOG.info(
"GTID set {} known by the server but not processed yet, for replication are available only GTID set {}",
gtidSetToReplicate,
nonPurgedGtidSetToReplicate);
if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
LOG.warn("Some of the GTIDs needed to replicate have been already purged");
return false;
}
return true;
}
LOG.info("Connector last known GTIDs are {}, but MySQL has {}", gtidSet, availableGtidSet);
return false;
}
private boolean checkBinlogFilename(MySqlOffsetContext offset) {
String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
if (binlogFilename == null) {
return true; // start at current position

Loading…
Cancel
Save