From e7964b160c1df3861571ba2199ba04d91d887ff2 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Tue, 21 Jan 2025 17:28:19 +0800 Subject: [PATCH] [FLINK-36086] Add EVENT_SERIAL_NO_KEY for record event processed in state for fix repeat read event after restart during one transaction --- .../sqlserver/SqlServerOffsetContext.java | 240 ++++++++++++++++++ .../sqlserver/source/offset/LsnFactory.java | 6 +- .../sqlserver/source/offset/LsnOffset.java | 2 +- 3 files changed, 246 insertions(+), 2 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java new file mode 100644 index 000000000..02547786c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -0,0 +1,240 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.TableId; +import io.debezium.schema.DataCollectionId; +import io.debezium.util.Collect; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.time.Instant; +import java.util.Map; + +/** + * Copied from Debezium project(1.9.8.final) to modify the + * io.debezium.connector.sqlserver.SqlServerOffsetContext.Loader#load(java.util.Map) method. The + * original method is not able to get the eventSerialNo from the offset map which stores as string. + */ +public class SqlServerOffsetContext implements OffsetContext { + + private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; + + private final Schema sourceInfoSchema; + private final SourceInfo sourceInfo; + private boolean snapshotCompleted; + private final TransactionContext transactionContext; + private final IncrementalSnapshotContext incrementalSnapshotContext; + + /** The index of the current event within the current transaction. */ + private long eventSerialNo; + + public SqlServerOffsetContext( + SqlServerConnectorConfig connectorConfig, + TxLogPosition position, + boolean snapshot, + boolean snapshotCompleted, + long eventSerialNo, + TransactionContext transactionContext, + IncrementalSnapshotContext incrementalSnapshotContext) { + sourceInfo = new SourceInfo(connectorConfig); + + sourceInfo.setCommitLsn(position.getCommitLsn()); + sourceInfo.setChangeLsn(position.getInTxLsn()); + sourceInfoSchema = sourceInfo.schema(); + + this.snapshotCompleted = snapshotCompleted; + if (this.snapshotCompleted) { + postSnapshotCompletion(); + } else { + sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); + } + this.eventSerialNo = eventSerialNo; + this.transactionContext = transactionContext; + this.incrementalSnapshotContext = incrementalSnapshotContext; + } + + public SqlServerOffsetContext( + SqlServerConnectorConfig connectorConfig, + TxLogPosition position, + boolean snapshot, + boolean snapshotCompleted) { + this( + connectorConfig, + position, + snapshot, + snapshotCompleted, + 1, + new TransactionContext(), + new SignalBasedIncrementalSnapshotContext<>()); + } + + @Override + public Map getOffset() { + if (sourceInfo.isSnapshot()) { + return Collect.hashMapOf( + SourceInfo.SNAPSHOT_KEY, + true, + SNAPSHOT_COMPLETED_KEY, + snapshotCompleted, + SourceInfo.COMMIT_LSN_KEY, + sourceInfo.getCommitLsn().toString()); + } else { + return incrementalSnapshotContext.store( + transactionContext.store( + Collect.hashMapOf( + SourceInfo.COMMIT_LSN_KEY, + sourceInfo.getCommitLsn().toString(), + SourceInfo.CHANGE_LSN_KEY, + sourceInfo.getChangeLsn() == null + ? null + : sourceInfo.getChangeLsn().toString(), + SourceInfo.EVENT_SERIAL_NO_KEY, + eventSerialNo))); + } + } + + @Override + public Schema getSourceInfoSchema() { + return sourceInfoSchema; + } + + @Override + public Struct getSourceInfo() { + return sourceInfo.struct(); + } + + public TxLogPosition getChangePosition() { + return TxLogPosition.valueOf(sourceInfo.getCommitLsn(), sourceInfo.getChangeLsn()); + } + + public long getEventSerialNo() { + return eventSerialNo; + } + + public void setChangePosition(TxLogPosition position, int eventCount) { + if (getChangePosition().equals(position)) { + eventSerialNo += eventCount; + } else { + eventSerialNo = eventCount; + } + sourceInfo.setCommitLsn(position.getCommitLsn()); + sourceInfo.setChangeLsn(position.getInTxLsn()); + sourceInfo.setEventSerialNo(eventSerialNo); + } + + @Override + public boolean isSnapshotRunning() { + return sourceInfo.isSnapshot() && !snapshotCompleted; + } + + public boolean isSnapshotCompleted() { + return snapshotCompleted; + } + + @Override + public void preSnapshotStart() { + sourceInfo.setSnapshot(SnapshotRecord.TRUE); + snapshotCompleted = false; + } + + @Override + public void preSnapshotCompletion() { + snapshotCompleted = true; + } + + @Override + public void postSnapshotCompletion() { + sourceInfo.setSnapshot(SnapshotRecord.FALSE); + } + + public static class Loader implements OffsetContext.Loader { + + private final SqlServerConnectorConfig connectorConfig; + + public Loader(SqlServerConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + @Override + public SqlServerOffsetContext load(Map offset) { + final Lsn changeLsn = Lsn.valueOf((String) offset.get(SourceInfo.CHANGE_LSN_KEY)); + final Lsn commitLsn = Lsn.valueOf((String) offset.get(SourceInfo.COMMIT_LSN_KEY)); + boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); + boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); + + // only introduced in 0.10.Beta1, so it might be not present when upgrading from earlier + // versions + // if value is String, convert it to Long + long eventSerialNo; + + Object eventSerialNoObj = offset.get(SourceInfo.EVENT_SERIAL_NO_KEY); + if (eventSerialNoObj != null) { + if (eventSerialNoObj instanceof String) { + eventSerialNo = Long.parseLong((String) eventSerialNoObj); + } else { + eventSerialNo = (Long) eventSerialNoObj; + } + } else { + eventSerialNo = 0L; + } + + return new SqlServerOffsetContext( + connectorConfig, + TxLogPosition.valueOf(commitLsn, changeLsn), + snapshot, + snapshotCompleted, + eventSerialNo, + TransactionContext.load(offset), + SignalBasedIncrementalSnapshotContext.load(offset)); + } + } + + @Override + public String toString() { + return "SqlServerOffsetContext [" + + "sourceInfoSchema=" + + sourceInfoSchema + + ", sourceInfo=" + + sourceInfo + + ", snapshotCompleted=" + + snapshotCompleted + + ", eventSerialNo=" + + eventSerialNo + + "]"; + } + + @Override + public void markLastSnapshotRecord() { + sourceInfo.setSnapshot(SnapshotRecord.LAST); + } + + @Override + public void event(DataCollectionId tableId, Instant timestamp) { + sourceInfo.setSourceTime(timestamp); + sourceInfo.setTableId((TableId) tableId); + } + + @Override + public TransactionContext getTransactionContext() { + return transactionContext; + } + + @Override + public void incrementalSnapshotEvents() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } + + @Override + public IncrementalSnapshotContext getIncrementalSnapshotContext() { + return incrementalSnapshotContext; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnFactory.java index 00bde8387..23c56c4bd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnFactory.java @@ -31,7 +31,11 @@ public class LsnFactory extends OffsetFactory { public Offset newOffset(Map offset) { Lsn changeLsn = Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY)); Lsn commitLsn = Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY)); - return new LsnOffset(changeLsn, commitLsn, null); + Long eventSerialNo = null; + if (offset.get(SourceInfo.EVENT_SERIAL_NO_KEY) != null) { + eventSerialNo = Long.valueOf(offset.get(SourceInfo.EVENT_SERIAL_NO_KEY)); + } + return new LsnOffset(changeLsn, commitLsn, eventSerialNo); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnOffset.java index 9f0433e6c..88160c568 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnOffset.java @@ -43,7 +43,7 @@ public class LsnOffset extends Offset { offsetMap.put(SourceInfo.COMMIT_LSN_KEY, commitScn.toString()); } if (eventSerialNo != null) { - offsetMap.put(SourceInfo.EVENT_SERIAL_NO_KEY, String.valueOf(eventSerialNo)); + offsetMap.put(SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo.toString()); } this.offset = offsetMap;