[FLINK-36573][cdc-connector][mysql] Add table information of binlog offsets when checkpointing

pull/3656/head
Runkang He 5 months ago committed by herunkang.runking
parent dd69756d3f
commit f685ed7404

@ -530,7 +530,11 @@ public class MySqlSourceReader<T>
return;
}
BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, offset);
LOG.info(
"Binlog offset for tables {} on checkpoint {}: {}",
split.asBinlogSplit().getTables(),
checkpointId,
offset);
}
}

@ -108,6 +108,16 @@ public class MySqlBinlogSplit extends MySqlSplit {
return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
}
public String getTables() {
String tables;
if (tableSchemas != null) {
tables = tableSchemas.keySet().toString();
} else {
tables = "[]";
}
return tables;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -142,10 +152,13 @@ public class MySqlBinlogSplit extends MySqlSplit {
@Override
public String toString() {
String tables = getTables();
return "MySqlBinlogSplit{"
+ "splitId='"
+ splitId
+ '\''
+ ", tables="
+ tables
+ ", offset="
+ startingOffset
+ ", endOffset="

Loading…
Cancel
Save