[hotfix][sqlserver] Fix backfill stream task hang (#2374)

* [sqlserver] Fix backfill stream task hang

* [sqlserver] Fix backfill stream task hang
pull/2437/head
gongzhongqiang 2 years ago committed by GitHub
parent 8e7378d8f9
commit 6fa02ef477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -274,7 +274,7 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on SqlServer server
// may sleep awhile to avoid DDOS on SqlServer server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);

@ -47,12 +47,8 @@ public class LsnOffset extends Offset {
this.offset = offsetMap;
}
public LsnOffset(Lsn lsn) {
this(lsn, null, null);
}
public Lsn getLcn() {
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
public LsnOffset(Lsn changeLsn) {
this(changeLsn, null, null);
}
@Override
@ -69,8 +65,8 @@ public class LsnOffset extends Offset {
return -1;
}
Lsn lsn = this.getLcn();
Lsn targetLsn = that.getLcn();
Lsn lsn = Lsn.valueOf(this.offset.get(SourceInfo.COMMIT_LSN_KEY));
Lsn targetLsn = Lsn.valueOf(that.offset.get(SourceInfo.COMMIT_LSN_KEY));
if (targetLsn.isAvailable()) {
if (lsn.isAvailable()) {
return lsn.compareTo(targetLsn);

@ -23,7 +23,7 @@ import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.LsnSplitReadTask;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.StreamSplitReadTask;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnection;
@ -118,7 +118,7 @@ public class SqlServerScanFetchTask implements FetchTask<SourceSplitBase> {
final SqlServerOffsetContext streamOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
final LsnSplitReadTask backfillBinlogReadTask =
final StreamSplitReadTask backfillBinlogReadTask =
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
@ -154,21 +154,27 @@ public class SqlServerScanFetchTask implements FetchTask<SourceSplitBase> {
WatermarkKind.END);
}
private LsnSplitReadTask createBackFillLsnSplitReadTask(
private StreamSplitReadTask createBackFillLsnSplitReadTask(
StreamSplit backfillBinlogSplit, SqlServerSourceFetchTaskContext context) {
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
context.getSourceConfig()
.getDbzConfiguration()
context.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
// table.include.list is schema.table format
.with(
"table.include.list",
new TableId(
null,
split.getTableId().schema(),
split.getTableId().table()))
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
// task to read binlog and backfill for current split
return new LsnSplitReadTask(
context.getDbzConnectorConfig(),
return new StreamSplitReadTask(
new SqlServerConnectorConfig(dezConf),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),

@ -77,8 +77,8 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
* A separate connection for retrieving details of the schema changes; without it, adaptive
* buffering will not work.
*
* @link
* https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering
* <p>For more details, please refer to <a
* href="https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering">guidelines-for-using-adaptive-buffering</a>
*/
private final SqlServerConnection metaDataConnection;

@ -17,12 +17,14 @@
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import io.debezium.DebeziumException;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
@ -36,14 +38,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition;
/** The task to work for fetching data of SqlServer table stream split . */
public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
private final StreamSplit split;
private volatile boolean taskRunning = false;
private LsnSplitReadTask redoLogSplitReadTask;
private StreamSplitReadTask redoLogSplitReadTask;
public SqlServerStreamFetchTask(StreamSplit split) {
this.split = split;
@ -56,7 +57,7 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
sourceFetchContext.getOffsetContext().preSnapshotCompletion();
taskRunning = true;
redoLogSplitReadTask =
new LsnSplitReadTask(
new StreamSplitReadTask(
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getConnection(),
sourceFetchContext.getMetaDataConnection(),
@ -91,15 +92,15 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
* A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark
* to highWatermark) binlog.
*/
public static class LsnSplitReadTask extends SqlServerStreamingChangeEventSource {
public static class StreamSplitReadTask extends SqlServerStreamingChangeEventSource {
private static final Logger LOG = LoggerFactory.getLogger(LsnSplitReadTask.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
private final StreamSplit lsnSplit;
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
public LsnSplitReadTask(
public StreamSplitReadTask(
SqlServerConnectorConfig connectorConfig,
SqlServerConnection connection,
SqlServerConnection metadataConnection,
@ -121,26 +122,25 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
}
@Override
public void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// check do we need to stop for fetch binlog for snapshot split.
if (isBoundedRead()) {
final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset());
// reach the high watermark, the binlog fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) {
// send binlog end event
LsnOffset currentLsnOffset = new LsnOffset(null, toLsn, null);
Offset endingOffset = lsnSplit.getEndingOffset();
if (currentLsnOffset.isAtOrAfter(endingOffset)) {
// send streaming end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
lsnSplit,
currentRedoLogOffset,
currentLsnOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell fetcher the binlog task finished
// tell fetcher the streaming task finished
((SqlServerScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
.finished();
}

@ -38,11 +38,11 @@ import java.util.stream.Collectors;
/**
* Copied from Debezium project(1.9.7.final) to add method {@link
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
* Server change data capture functionality. A main loop polls database DDL change and change data
* tables and turns them into change events.
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
* capture functionality. A main loop polls database DDL change and change data tables and turns
* them into change events.
*
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
* monitors source table and write changes from the table into the change table.
@ -198,7 +198,6 @@ public class SqlServerStreamingChangeEventSource
if (context.isRunning()) {
commitTransaction();
afterHandleLsn(partition, offsetContext);
final Lsn toLsn =
getToLsn(
dataConnection,
@ -449,6 +448,8 @@ public class SqlServerStreamingChangeEventSource
TxLogPosition.valueOf(toLsn));
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
// Determine whether to continue streaming in sqlserver cdc snapshot phase
afterHandleLsn(partition, toLsn);
} catch (SQLException e) {
tablesSlot.set(
processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
@ -625,8 +626,7 @@ public class SqlServerStreamingChangeEventSource
}
/** expose control to the user to stop the connector. */
protected void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// do nothing
}
}

Loading…
Cancel
Save