|
|
|
@ -22,6 +22,7 @@ import org.apache.flink.util.FlinkRuntimeException;
|
|
|
|
|
|
|
|
|
|
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
|
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask;
|
|
|
|
|
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
|
|
|
|
@ -114,42 +115,25 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
|
|
|
|
|
SnapshotResult snapshotResult =
|
|
|
|
|
splitSnapshotReadTask.execute(sourceContext);
|
|
|
|
|
|
|
|
|
|
final MySqlBinlogSplit appendBinlogSplit = createBinlogSplit(sourceContext);
|
|
|
|
|
final MySqlOffsetContext mySqlOffsetContext =
|
|
|
|
|
statefulTaskContext.getOffsetContext();
|
|
|
|
|
mySqlOffsetContext.setBinlogStartPoint(
|
|
|
|
|
appendBinlogSplit.getStartingOffset().getFilename(),
|
|
|
|
|
appendBinlogSplit.getStartingOffset().getPosition());
|
|
|
|
|
final MySqlBinlogSplit backfillBinlogSplit =
|
|
|
|
|
createBackfillBinlogSplit(sourceContext);
|
|
|
|
|
// optimization that skip the binlog read when the low watermark equals high
|
|
|
|
|
// watermark
|
|
|
|
|
final boolean binlogBackfillRequired =
|
|
|
|
|
backfillBinlogSplit
|
|
|
|
|
.getEndingOffset()
|
|
|
|
|
.isBefore(backfillBinlogSplit.getStartingOffset());
|
|
|
|
|
if (!binlogBackfillRequired) {
|
|
|
|
|
dispatchHighWatermark(backfillBinlogSplit);
|
|
|
|
|
currentTaskRunning = false;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// execute binlog read task
|
|
|
|
|
if (snapshotResult.isCompletedOrSkipped()) {
|
|
|
|
|
// we should only capture events for the current table,
|
|
|
|
|
// otherwise, we may can't find corresponding schema
|
|
|
|
|
Configuration dezConf =
|
|
|
|
|
statefulTaskContext
|
|
|
|
|
.getDezConf()
|
|
|
|
|
.edit()
|
|
|
|
|
.with(
|
|
|
|
|
"table.whitelist",
|
|
|
|
|
currentSnapshotSplit.getTableId())
|
|
|
|
|
.build();
|
|
|
|
|
// task to read binlog for current split
|
|
|
|
|
MySqlBinlogSplitReadTask splitBinlogReadTask =
|
|
|
|
|
new MySqlBinlogSplitReadTask(
|
|
|
|
|
new MySqlConnectorConfig(dezConf),
|
|
|
|
|
mySqlOffsetContext,
|
|
|
|
|
statefulTaskContext.getConnection(),
|
|
|
|
|
statefulTaskContext.getDispatcher(),
|
|
|
|
|
statefulTaskContext.getErrorHandler(),
|
|
|
|
|
StatefulTaskContext.getClock(),
|
|
|
|
|
statefulTaskContext.getTaskContext(),
|
|
|
|
|
(MySqlStreamingChangeEventSourceMetrics)
|
|
|
|
|
statefulTaskContext
|
|
|
|
|
.getStreamingChangeEventSourceMetrics(),
|
|
|
|
|
statefulTaskContext
|
|
|
|
|
.getTopicSelector()
|
|
|
|
|
.getPrimaryTopic(),
|
|
|
|
|
appendBinlogSplit);
|
|
|
|
|
splitBinlogReadTask.execute(
|
|
|
|
|
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
|
|
|
|
|
createBackfillBinlogReadTask(backfillBinlogSplit);
|
|
|
|
|
backfillBinlogReadTask.execute(
|
|
|
|
|
new SnapshotBinlogSplitChangeEventSourceContextImpl());
|
|
|
|
|
} else {
|
|
|
|
|
readException =
|
|
|
|
@ -170,7 +154,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private MySqlBinlogSplit createBinlogSplit(
|
|
|
|
|
private MySqlBinlogSplit createBackfillBinlogSplit(
|
|
|
|
|
SnapshotSplitChangeEventSourceContextImpl sourceContext) {
|
|
|
|
|
return new MySqlBinlogSplit(
|
|
|
|
|
currentSnapshotSplit.splitId(),
|
|
|
|
@ -181,6 +165,49 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
|
|
|
|
|
currentSnapshotSplit.getTableSchemas());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
|
|
|
|
|
MySqlBinlogSplit backfillBinlogSplit) {
|
|
|
|
|
final MySqlOffsetContext.Loader loader =
|
|
|
|
|
new MySqlOffsetContext.Loader(statefulTaskContext.getConnectorConfig());
|
|
|
|
|
final MySqlOffsetContext mySqlOffsetContext =
|
|
|
|
|
(MySqlOffsetContext)
|
|
|
|
|
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
|
|
|
|
|
// we should only capture events for the current table,
|
|
|
|
|
// otherwise, we may can't find corresponding schema
|
|
|
|
|
Configuration dezConf =
|
|
|
|
|
statefulTaskContext
|
|
|
|
|
.getDezConf()
|
|
|
|
|
.edit()
|
|
|
|
|
.with("table.whitelist", currentSnapshotSplit.getTableId())
|
|
|
|
|
.build();
|
|
|
|
|
// task to read binlog and backfill for current split
|
|
|
|
|
return new MySqlBinlogSplitReadTask(
|
|
|
|
|
new MySqlConnectorConfig(dezConf),
|
|
|
|
|
mySqlOffsetContext,
|
|
|
|
|
statefulTaskContext.getConnection(),
|
|
|
|
|
statefulTaskContext.getDispatcher(),
|
|
|
|
|
statefulTaskContext.getErrorHandler(),
|
|
|
|
|
StatefulTaskContext.getClock(),
|
|
|
|
|
statefulTaskContext.getTaskContext(),
|
|
|
|
|
(MySqlStreamingChangeEventSourceMetrics)
|
|
|
|
|
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
|
|
|
|
|
statefulTaskContext.getTopicSelector().getPrimaryTopic(),
|
|
|
|
|
backfillBinlogSplit);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void dispatchHighWatermark(MySqlBinlogSplit backFillBinlogSplit)
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
final SignalEventDispatcher signalEventDispatcher =
|
|
|
|
|
new SignalEventDispatcher(
|
|
|
|
|
statefulTaskContext.getOffsetContext().getPartition(),
|
|
|
|
|
statefulTaskContext.getTopicSelector().getPrimaryTopic(),
|
|
|
|
|
statefulTaskContext.getDispatcher().getQueue());
|
|
|
|
|
signalEventDispatcher.dispatchWatermarkEvent(
|
|
|
|
|
backFillBinlogSplit,
|
|
|
|
|
backFillBinlogSplit.getEndingOffset(),
|
|
|
|
|
SignalEventDispatcher.WatermarkKind.BINLOG_END);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean isFinished() {
|
|
|
|
|
return currentSnapshotSplit == null
|
|
|
|
|