|
|
|
@ -31,10 +31,9 @@ import io.debezium.connector.mysql.MySqlDatabaseSchema;
|
|
|
|
|
import io.debezium.connector.mysql.MySqlOffsetContext;
|
|
|
|
|
import io.debezium.connector.mysql.MySqlValueConverters;
|
|
|
|
|
import io.debezium.pipeline.EventDispatcher;
|
|
|
|
|
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
|
|
|
|
|
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
|
|
|
|
|
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
|
|
|
|
|
import io.debezium.pipeline.spi.ChangeRecordEmitter;
|
|
|
|
|
import io.debezium.pipeline.spi.OffsetContext;
|
|
|
|
|
import io.debezium.pipeline.spi.SnapshotResult;
|
|
|
|
|
import io.debezium.relational.Column;
|
|
|
|
|
import io.debezium.relational.RelationalSnapshotChangeEventSource;
|
|
|
|
@ -62,7 +61,8 @@ import java.util.Calendar;
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset;
|
|
|
|
|
|
|
|
|
|
/** Task to read snapshot split of table. */
|
|
|
|
|
public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {
|
|
|
|
|
public class MySqlSnapshotSplitReadTask
|
|
|
|
|
extends AbstractSnapshotChangeEventSource<MySqlOffsetContext> {
|
|
|
|
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);
|
|
|
|
|
|
|
|
|
@ -75,22 +75,19 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
|
|
|
|
|
private final EventDispatcherImpl<TableId> dispatcher;
|
|
|
|
|
private final Clock clock;
|
|
|
|
|
private final MySqlSnapshotSplit snapshotSplit;
|
|
|
|
|
private final MySqlOffsetContext offsetContext;
|
|
|
|
|
private final TopicSelector<TableId> topicSelector;
|
|
|
|
|
private final SnapshotProgressListener snapshotProgressListener;
|
|
|
|
|
private final SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
|
|
|
|
|
|
|
|
|
|
public MySqlSnapshotSplitReadTask(
|
|
|
|
|
MySqlConnectorConfig connectorConfig,
|
|
|
|
|
MySqlOffsetContext previousOffset,
|
|
|
|
|
SnapshotProgressListener snapshotProgressListener,
|
|
|
|
|
SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics,
|
|
|
|
|
MySqlDatabaseSchema databaseSchema,
|
|
|
|
|
MySqlConnection jdbcConnection,
|
|
|
|
|
EventDispatcherImpl<TableId> dispatcher,
|
|
|
|
|
TopicSelector<TableId> topicSelector,
|
|
|
|
|
Clock clock,
|
|
|
|
|
MySqlSnapshotSplit snapshotSplit) {
|
|
|
|
|
super(connectorConfig, previousOffset, snapshotProgressListener);
|
|
|
|
|
this.offsetContext = previousOffset;
|
|
|
|
|
super(connectorConfig, snapshotChangeEventSourceMetrics);
|
|
|
|
|
this.connectorConfig = connectorConfig;
|
|
|
|
|
this.databaseSchema = databaseSchema;
|
|
|
|
|
this.jdbcConnection = jdbcConnection;
|
|
|
|
@ -98,13 +95,15 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
|
|
|
|
|
this.clock = clock;
|
|
|
|
|
this.snapshotSplit = snapshotSplit;
|
|
|
|
|
this.topicSelector = topicSelector;
|
|
|
|
|
this.snapshotProgressListener = snapshotProgressListener;
|
|
|
|
|
this.snapshotChangeEventSourceMetrics = snapshotChangeEventSourceMetrics;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
|
|
|
|
|
public SnapshotResult<MySqlOffsetContext> execute(
|
|
|
|
|
ChangeEventSourceContext context, MySqlOffsetContext previousOffset)
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);
|
|
|
|
|
final SnapshotContext ctx;
|
|
|
|
|
final SnapshotContext<MySqlOffsetContext> ctx;
|
|
|
|
|
try {
|
|
|
|
|
ctx = prepare(context);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
@ -112,7 +111,7 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
return doExecute(context, ctx, snapshottingTask);
|
|
|
|
|
return doExecute(context, previousOffset, ctx, snapshottingTask);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
LOG.warn("Snapshot was interrupted before completion");
|
|
|
|
|
throw e;
|
|
|
|
@ -122,17 +121,21 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected SnapshotResult doExecute(
|
|
|
|
|
protected SnapshotResult<MySqlOffsetContext> doExecute(
|
|
|
|
|
ChangeEventSourceContext context,
|
|
|
|
|
SnapshotContext snapshotContext,
|
|
|
|
|
MySqlOffsetContext previousOffset,
|
|
|
|
|
SnapshotContext<MySqlOffsetContext> snapshotContext,
|
|
|
|
|
SnapshottingTask snapshottingTask)
|
|
|
|
|
throws Exception {
|
|
|
|
|
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
|
|
|
|
|
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
|
|
|
|
|
ctx.offset = offsetContext;
|
|
|
|
|
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlOffsetContext>
|
|
|
|
|
ctx =
|
|
|
|
|
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
|
|
|
|
|
MySqlOffsetContext>)
|
|
|
|
|
snapshotContext;
|
|
|
|
|
ctx.offset = previousOffset;
|
|
|
|
|
final SignalEventDispatcher signalEventDispatcher =
|
|
|
|
|
new SignalEventDispatcher(
|
|
|
|
|
offsetContext.getPartition(),
|
|
|
|
|
previousOffset.getPartition(),
|
|
|
|
|
topicSelector.topicNameFor(snapshotSplit.getTableId()),
|
|
|
|
|
dispatcher.getQueue());
|
|
|
|
|
|
|
|
|
@ -163,7 +166,7 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
|
|
|
|
|
protected SnapshottingTask getSnapshottingTask(MySqlOffsetContext offsetContext) {
|
|
|
|
|
return new SnapshottingTask(false, true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -246,7 +249,7 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
|
|
|
|
|
rows,
|
|
|
|
|
snapshotSplit.splitId(),
|
|
|
|
|
Strings.duration(stop - exportStart));
|
|
|
|
|
snapshotProgressListener.rowsScanned(table.id(), rows);
|
|
|
|
|
snapshotChangeEventSourceMetrics.rowsScanned(table.id(), rows);
|
|
|
|
|
logTimer = getTableScanLogTimer();
|
|
|
|
|
}
|
|
|
|
|
dispatcher.dispatchSnapshotEvent(
|
|
|
|
|