|
|
|
@ -163,10 +163,18 @@ public class StreamSplit extends SourceSplitBase {
|
|
|
|
|
// -------------------------------------------------------------------
|
|
|
|
|
public static StreamSplit appendFinishedSplitInfos(
|
|
|
|
|
StreamSplit streamSplit, List<FinishedSnapshotSplitInfo> splitInfos) {
|
|
|
|
|
// re-calculate the starting changelog offset after the new table added
|
|
|
|
|
Offset startingOffset = streamSplit.getStartingOffset();
|
|
|
|
|
for (FinishedSnapshotSplitInfo splitInfo : splitInfos) {
|
|
|
|
|
if (splitInfo.getHighWatermark().isBefore(startingOffset)) {
|
|
|
|
|
startingOffset = splitInfo.getHighWatermark();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
splitInfos.addAll(streamSplit.getFinishedSnapshotSplitInfos());
|
|
|
|
|
|
|
|
|
|
return new StreamSplit(
|
|
|
|
|
streamSplit.splitId,
|
|
|
|
|
streamSplit.getStartingOffset(),
|
|
|
|
|
startingOffset,
|
|
|
|
|
streamSplit.getEndingOffset(),
|
|
|
|
|
splitInfos,
|
|
|
|
|
streamSplit.getTableSchemas(),
|
|
|
|
|