[cdc-base] Let all records of one snapshot split don't cross checkpoints (#1622)

This closes #1622.
pull/1619/head
Hang Ruan 2 years ago committed by GitHub
parent aac6e31718
commit 8df5b110ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -22,6 +22,7 @@ import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceReader;
@ -35,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp;
@ -52,7 +54,7 @@ import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isWaterm
* emit records rather than emit the records directly.
*/
public class JdbcSourceRecordEmitter<T>
implements RecordEmitter<SourceRecord, T, SourceSplitState> {
implements RecordEmitter<SourceRecords, T, SourceSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceRecordEmitter.class);
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
@ -78,6 +80,15 @@ public class JdbcSourceRecordEmitter<T>
@Override
public void emitRecord(
SourceRecords sourceRecords, SourceOutput<T> output, SourceSplitState splitState)
throws Exception {
final Iterator<SourceRecord> elementIterator = sourceRecords.iterator();
while (elementIterator.hasNext()) {
processElement(elementIterator.next(), output, splitState);
}
}
private void processElement(
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState)
throws Exception {
if (isWatermarkEvent(element)) {

@ -45,6 +45,7 @@ import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsStat
import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
@ -52,7 +53,6 @@ import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceRead
import com.ververica.cdc.connectors.base.source.reader.JdbcSourceSplitReader;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.function.Supplier;
@ -101,7 +101,7 @@ public class JdbcIncrementalSource<T>
public SourceReader createReader(SourceReaderContext readerContext) {
// create source config for the given subtask (e.g. unique server id)
JdbcSourceConfig sourceConfig = configFactory.create(readerContext.getIndexOfSubtask());
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final SourceReaderMetrics sourceReaderMetrics =
new SourceReaderMetrics(readerContext.metricGroup());

@ -18,8 +18,6 @@ package com.ververica.cdc.connectors.base.source.meta.split;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.kafka.connect.source.SourceRecord;
import javax.annotation.Nullable;
import java.util.Collections;
@ -29,11 +27,11 @@ import java.util.Set;
/**
* An implementation of {@link RecordsWithSplitIds} which contains the records of one table split.
*/
public final class ChangeEventRecords implements RecordsWithSplitIds<SourceRecord> {
public final class ChangeEventRecords implements RecordsWithSplitIds<SourceRecords> {
@Nullable private String splitId;
@Nullable private Iterator<SourceRecord> recordsForCurrentSplit;
@Nullable private final Iterator<SourceRecord> recordsForSplit;
@Nullable private Iterator<SourceRecords> recordsForCurrentSplit;
@Nullable private final Iterator<SourceRecords> recordsForSplit;
private final Set<String> finishedSnapshotSplits;
public ChangeEventRecords(
@ -59,8 +57,8 @@ public final class ChangeEventRecords implements RecordsWithSplitIds<SourceRecor
@Nullable
@Override
public SourceRecord nextRecordFromSplit() {
final Iterator<SourceRecord> recordsForSplit = this.recordsForCurrentSplit;
public SourceRecords nextRecordFromSplit() {
final Iterator<SourceRecords> recordsForSplit = this.recordsForCurrentSplit;
if (recordsForSplit != null) {
if (recordsForSplit.hasNext()) {
return recordsForSplit.next();
@ -78,7 +76,7 @@ public final class ChangeEventRecords implements RecordsWithSplitIds<SourceRecor
}
public static ChangeEventRecords forRecords(
final String splitId, final Iterator<SourceRecord> recordsForSplit) {
final String splitId, final Iterator<SourceRecords> recordsForSplit) {
return new ChangeEventRecords(splitId, recordsForSplit, Collections.emptySet());
}

@ -0,0 +1,47 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.source.meta.split;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/** Data structure to describe a set of {@link SourceRecord}. */
public final class SourceRecords {
private final List<SourceRecord> sourceRecords;
public SourceRecords(List<SourceRecord> sourceRecords) {
this.sourceRecords = sourceRecords;
}
public List<SourceRecord> getSourceRecordList() {
return sourceRecords;
}
public Iterator<SourceRecord> iterator() {
return sourceRecords.iterator();
}
public static SourceRecords fromSingleRecord(SourceRecord record) {
final List<SourceRecord> records = new ArrayList<>();
records.add(record);
return new SourceRecords(records);
}
}

@ -39,6 +39,7 @@ import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
@ -46,7 +47,6 @@ import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,7 +68,7 @@ import static org.apache.flink.util.Preconditions.checkState;
@Experimental
public class JdbcIncrementalSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
SourceRecord, T, SourceSplitBase, SourceSplitState> {
SourceRecords, T, SourceSplitBase, SourceSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
@ -80,9 +80,9 @@ public class JdbcIncrementalSourceReader<T>
private final JdbcDataSourceDialect dialect;
public JdbcIncrementalSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,
Supplier<JdbcSourceSplitReader> splitReaderSupplier,
RecordEmitter<SourceRecord, T, SourceSplitState> recordEmitter,
RecordEmitter<SourceRecords, T, SourceSplitState> recordEmitter,
Configuration config,
SourceReaderContext context,
JdbcSourceConfig sourceConfig,

@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.Fetcher;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
@ -43,13 +44,13 @@ import java.util.Queue;
/** Basic class read {@link SourceSplitBase} and return {@link SourceRecord}. */
@Experimental
public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSplitBase> {
public class JdbcSourceSplitReader implements SplitReader<SourceRecords, SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitReader.class);
private final Queue<SourceSplitBase> splits;
private final int subtaskId;
@Nullable private Fetcher<SourceRecord, SourceSplitBase> currentFetcher;
@Nullable private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;
@Nullable private String currentSplitId;
private final JdbcDataSourceDialect dataSourceDialect;
private final JdbcSourceConfig sourceConfig;
@ -63,9 +64,9 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
}
@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
checkSplitOrStartNext();
Iterator<SourceRecord> dataIt = null;
Iterator<SourceRecords> dataIt = null;
try {
dataIt = currentFetcher.pollSplitRecords();
} catch (InterruptedException e) {

@ -21,6 +21,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
@ -55,7 +56,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* Fetcher to fetch data from table split, the split is the snapshot split {@link SnapshotSplit}.
*/
public class JdbcSourceScanFetcher implements Fetcher<SourceRecord, SourceSplitBase> {
public class JdbcSourceScanFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceScanFetcher.class);
@ -115,7 +116,7 @@ public class JdbcSourceScanFetcher implements Fetcher<SourceRecord, SourceSplitB
@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
if (hasNextElement.get()) {
@ -168,7 +169,10 @@ public class JdbcSourceScanFetcher implements Fetcher<SourceRecord, SourceSplitB
normalizedRecords.add(lowWatermark);
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
normalizedRecords.add(highWatermark);
return normalizedRecords.iterator();
final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
sourceRecordsSet.add(new SourceRecords(normalizedRecords));
return sourceRecordsSet.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);

@ -23,6 +23,7 @@ import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadF
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
@ -50,7 +51,7 @@ import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isDataCh
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.splitKeyRangeContains;
/** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */
public class JdbcSourceStreamFetcher implements Fetcher<SourceRecord, SourceSplitBase> {
public class JdbcSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceStreamFetcher.class);
private final JdbcSourceFetchTaskContext taskContext;
@ -102,7 +103,7 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecord, SourceSpli
@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
if (streamFetchTask.isRunning()) {
@ -113,7 +114,9 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecord, SourceSpli
}
}
}
return sourceRecords.iterator();
List<SourceRecords> sourceRecordsSet = new ArrayList<>();
sourceRecordsSet.add(new SourceRecords(sourceRecords));
return sourceRecordsSet.iterator();
}
private void checkReadException() {

Loading…
Cancel
Save