From bdca0e328bce0aa2dc05153bc67da4c4875586fd Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Thu, 25 Apr 2024 14:03:23 +0800 Subject: [PATCH] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added (#3230) --- .../connectors/base/source/meta/split/StreamSplit.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java index f4143364a..7b0b048c4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java @@ -163,10 +163,18 @@ public class StreamSplit extends SourceSplitBase { // ------------------------------------------------------------------- public static StreamSplit appendFinishedSplitInfos( StreamSplit streamSplit, List splitInfos) { + // re-calculate the starting changelog offset after the new table added + Offset startingOffset = streamSplit.getStartingOffset(); + for (FinishedSnapshotSplitInfo splitInfo : splitInfos) { + if (splitInfo.getHighWatermark().isBefore(startingOffset)) { + startingOffset = splitInfo.getHighWatermark(); + } + } splitInfos.addAll(streamSplit.getFinishedSnapshotSplitInfos()); + return new StreamSplit( streamSplit.splitId, - streamSplit.getStartingOffset(), + startingOffset, streamSplit.getEndingOffset(), splitInfos, streamSplit.getTableSchemas(),