From 8df5b110ab083c4825b0103aaf903408ea9035ab Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 18 Oct 2022 17:26:22 +0800 Subject: [PATCH] [cdc-base] Let all records of one snapshot split don't cross checkpoints (#1622) This closes #1622. --- .../relational/JdbcSourceRecordEmitter.java | 13 ++++- .../base/source/JdbcIncrementalSource.java | 4 +- .../source/meta/split/ChangeEventRecords.java | 14 +++--- .../base/source/meta/split/SourceRecords.java | 47 +++++++++++++++++++ .../reader/JdbcIncrementalSourceReader.java | 8 ++-- .../source/reader/JdbcSourceSplitReader.java | 9 ++-- .../external/JdbcSourceScanFetcher.java | 10 ++-- .../external/JdbcSourceStreamFetcher.java | 9 ++-- 8 files changed, 89 insertions(+), 25 deletions(-) create mode 100644 flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceRecords.java diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java index 823c0d9c8..ac98927c9 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java @@ -22,6 +22,7 @@ import org.apache.flink.util.Collector; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState; import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceReader; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp; @@ -52,7 +54,7 @@ import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isWaterm * emit records rather than emit the records directly. */ public class JdbcSourceRecordEmitter - implements RecordEmitter { + implements RecordEmitter { private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceRecordEmitter.class); private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = @@ -78,6 +80,15 @@ public class JdbcSourceRecordEmitter @Override public void emitRecord( + SourceRecords sourceRecords, SourceOutput output, SourceSplitState splitState) + throws Exception { + final Iterator elementIterator = sourceRecords.iterator(); + while (elementIterator.hasNext()) { + processElement(elementIterator.next(), output, splitState); + } + } + + private void processElement( SourceRecord element, SourceOutput output, SourceSplitState splitState) throws Exception { if (isWatermarkEvent(element)) { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java index e45e7a927..05faba0c9 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/JdbcIncrementalSource.java @@ -45,6 +45,7 @@ import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsStat import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState; import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer; import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; @@ -52,7 +53,6 @@ import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceRead import com.ververica.cdc.connectors.base.source.reader.JdbcSourceSplitReader; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.relational.TableId; -import org.apache.kafka.connect.source.SourceRecord; import java.util.List; import java.util.function.Supplier; @@ -101,7 +101,7 @@ public class JdbcIncrementalSource public SourceReader createReader(SourceReaderContext readerContext) { // create source config for the given subtask (e.g. unique server id) JdbcSourceConfig sourceConfig = configFactory.create(readerContext.getIndexOfSubtask()); - FutureCompletingBlockingQueue> elementsQueue = + FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); final SourceReaderMetrics sourceReaderMetrics = new SourceReaderMetrics(readerContext.metricGroup()); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/ChangeEventRecords.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/ChangeEventRecords.java index 90546595e..e39cedd8a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/ChangeEventRecords.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/ChangeEventRecords.java @@ -18,8 +18,6 @@ package com.ververica.cdc.connectors.base.source.meta.split; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; -import org.apache.kafka.connect.source.SourceRecord; - import javax.annotation.Nullable; import java.util.Collections; @@ -29,11 +27,11 @@ import java.util.Set; /** * An implementation of {@link RecordsWithSplitIds} which contains the records of one table split. */ -public final class ChangeEventRecords implements RecordsWithSplitIds { +public final class ChangeEventRecords implements RecordsWithSplitIds { @Nullable private String splitId; - @Nullable private Iterator recordsForCurrentSplit; - @Nullable private final Iterator recordsForSplit; + @Nullable private Iterator recordsForCurrentSplit; + @Nullable private final Iterator recordsForSplit; private final Set finishedSnapshotSplits; public ChangeEventRecords( @@ -59,8 +57,8 @@ public final class ChangeEventRecords implements RecordsWithSplitIds recordsForSplit = this.recordsForCurrentSplit; + public SourceRecords nextRecordFromSplit() { + final Iterator recordsForSplit = this.recordsForCurrentSplit; if (recordsForSplit != null) { if (recordsForSplit.hasNext()) { return recordsForSplit.next(); @@ -78,7 +76,7 @@ public final class ChangeEventRecords implements RecordsWithSplitIds recordsForSplit) { + final String splitId, final Iterator recordsForSplit) { return new ChangeEventRecords(splitId, recordsForSplit, Collections.emptySet()); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceRecords.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceRecords.java new file mode 100644 index 000000000..d0ac2794e --- /dev/null +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceRecords.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.source.meta.split; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Data structure to describe a set of {@link SourceRecord}. */ +public final class SourceRecords { + + private final List sourceRecords; + + public SourceRecords(List sourceRecords) { + this.sourceRecords = sourceRecords; + } + + public List getSourceRecordList() { + return sourceRecords; + } + + public Iterator iterator() { + return sourceRecords.iterator(); + } + + public static SourceRecords fromSingleRecord(SourceRecord record) { + final List records = new ArrayList<>(); + records.add(record); + return new SourceRecords(records); + } +} diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java index b6ffc74ee..d5ae961e5 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java @@ -39,6 +39,7 @@ import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplitState; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState; @@ -46,7 +47,6 @@ import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +68,7 @@ import static org.apache.flink.util.Preconditions.checkState; @Experimental public class JdbcIncrementalSourceReader extends SingleThreadMultiplexSourceReaderBase< - SourceRecord, T, SourceSplitBase, SourceSplitState> { + SourceRecords, T, SourceSplitBase, SourceSplitState> { private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class); @@ -80,9 +80,9 @@ public class JdbcIncrementalSourceReader private final JdbcDataSourceDialect dialect; public JdbcIncrementalSourceReader( - FutureCompletingBlockingQueue> elementQueue, + FutureCompletingBlockingQueue> elementQueue, Supplier splitReaderSupplier, - RecordEmitter recordEmitter, + RecordEmitter recordEmitter, Configuration config, SourceReaderContext context, JdbcSourceConfig sourceConfig, diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java index f3c50116a..9649e0f5c 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect; import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.reader.external.Fetcher; import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; @@ -43,13 +44,13 @@ import java.util.Queue; /** Basic class read {@link SourceSplitBase} and return {@link SourceRecord}. */ @Experimental -public class JdbcSourceSplitReader implements SplitReader { +public class JdbcSourceSplitReader implements SplitReader { private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitReader.class); private final Queue splits; private final int subtaskId; - @Nullable private Fetcher currentFetcher; + @Nullable private Fetcher currentFetcher; @Nullable private String currentSplitId; private final JdbcDataSourceDialect dataSourceDialect; private final JdbcSourceConfig sourceConfig; @@ -63,9 +64,9 @@ public class JdbcSourceSplitReader implements SplitReader fetch() throws IOException { + public RecordsWithSplitIds fetch() throws IOException { checkSplitOrStartNext(); - Iterator dataIt = null; + Iterator dataIt = null; try { dataIt = currentFetcher.pollSplitRecords(); } catch (InterruptedException e) { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java index 41895285c..7a9e19d58 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java @@ -21,6 +21,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.DataChangeEvent; @@ -55,7 +56,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Fetcher to fetch data from table split, the split is the snapshot split {@link SnapshotSplit}. */ -public class JdbcSourceScanFetcher implements Fetcher { +public class JdbcSourceScanFetcher implements Fetcher { private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceScanFetcher.class); @@ -115,7 +116,7 @@ public class JdbcSourceScanFetcher implements Fetcher pollSplitRecords() throws InterruptedException { + public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { @@ -168,7 +169,10 @@ public class JdbcSourceScanFetcher implements Fetcher sourceRecordsSet = new ArrayList<>(); + sourceRecordsSet.add(new SourceRecords(normalizedRecords)); + return sourceRecordsSet.iterator(); } // the data has been polled, no more data reachEnd.compareAndSet(false, true); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java index bdbdc8f8a..3c026ac96 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java @@ -23,6 +23,7 @@ import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadF import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.utils.SourceRecordUtils; @@ -50,7 +51,7 @@ import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isDataCh import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.splitKeyRangeContains; /** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */ -public class JdbcSourceStreamFetcher implements Fetcher { +public class JdbcSourceStreamFetcher implements Fetcher { private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceStreamFetcher.class); private final JdbcSourceFetchTaskContext taskContext; @@ -102,7 +103,7 @@ public class JdbcSourceStreamFetcher implements Fetcher pollSplitRecords() throws InterruptedException { + public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); final List sourceRecords = new ArrayList<>(); if (streamFetchTask.isRunning()) { @@ -113,7 +114,9 @@ public class JdbcSourceStreamFetcher implements Fetcher sourceRecordsSet = new ArrayList<>(); + sourceRecordsSet.add(new SourceRecords(sourceRecords)); + return sourceRecordsSet.iterator(); } private void checkReadException() {