[FLINK-34688][cdc-connector][base] CDC framework split snapshot chunks asynchronously (#3510)

pull/3791/merge
Hongshun Wang 2 months ago committed by GitHub
parent b50d1728e0
commit 12cf22f7b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.base.dialect;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@ -64,8 +65,11 @@ public interface DataSourceDialect<C extends SourceConfig> extends Serializable,
boolean isDataCollectionIdCaseSensitive(C sourceConfig);
/** Returns the {@link ChunkSplitter} which used to split collection to splits. */
@Deprecated
ChunkSplitter createChunkSplitter(C sourceConfig);
ChunkSplitter createChunkSplitter(C sourceConfig, ChunkSplitterState chunkSplitterState);
/** The fetch task used to fetch data of a snapshot split or stream split. */
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.base.source.assigner;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.assigner.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
@ -31,6 +32,8 @@ import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetri
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.slf4j.Logger;
@ -51,6 +54,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus.INITIAL_ASSIGNING;
@ -73,7 +80,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private final C sourceConfig;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private ChunkSplitter chunkSplitter;
@ -87,6 +94,10 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private final Map<String, Long> splitFinishedCheckpointIds;
private static final long UNDEFINED_CHECKPOINT_ID = -1;
private final Object lock = new Object();
private ExecutorService splittingExecutorService;
private volatile Throwable uncaughtSplitterException;
public SnapshotSplitAssigner(
C sourceConfig,
int currentParallelism,
@ -108,7 +119,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
true,
dialect,
offsetFactory,
new ConcurrentHashMap<>());
new ConcurrentHashMap<>(),
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
public SnapshotSplitAssigner(
@ -131,7 +143,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
checkpoint.isRemainingTablesCheckpointed(),
dialect,
offsetFactory,
new ConcurrentHashMap<>());
new ConcurrentHashMap<>(),
checkpoint.getChunkSplitterState());
}
private SnapshotSplitAssigner(
@ -148,7 +161,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
boolean isRemainingTablesCheckpointed,
DataSourceDialect<C> dialect,
OffsetFactory offsetFactory,
Map<String, Long> splitFinishedCheckpointIds) {
Map<String, Long> splitFinishedCheckpointIds,
ChunkSplitterState chunkSplitterState) {
this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
@ -167,19 +181,21 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
this.remainingTables = new LinkedList<>(remainingTables);
this.remainingTables = new CopyOnWriteArrayList<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect;
this.offsetFactory = offsetFactory;
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
chunkSplitter = createChunkSplitter(sourceConfig, dialect, chunkSplitterState);
}
@Override
public void open() {
chunkSplitter = dialect.createChunkSplitter(sourceConfig);
chunkSplitter.open();
discoveryCaptureTables();
captureNewlyAddedTables();
startAsynchronouslySplit();
}
private void discoveryCaptureTables() {
@ -319,45 +335,89 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
return numTablesPendingSnapshot;
}
@Override
public Optional<SourceSplitBase> getNext() {
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
Iterator<SchemalessSnapshotSplit> iterator = remainingSplits.iterator();
SchemalessSnapshotSplit split = iterator.next();
iterator.remove();
assignedSplits.put(split.splitId(), split);
enumeratorMetrics
.getTableMetrics(split.getTableId())
.finishProcessSplit(split.splitId());
return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
} else {
// it's turn for new table
TableId nextTable = remainingTables.pollFirst();
if (nextTable != null) {
// split the given table into chunks (snapshot splits)
Collection<SnapshotSplit> splits = chunkSplitter.generateSplits(nextTable);
final Map<TableId, TableChanges.TableChange> tableSchema = new HashMap<>();
if (!splits.isEmpty()) {
tableSchema.putAll(splits.iterator().next().getTableSchemas());
private void startAsynchronouslySplit() {
if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
if (splittingExecutorService == null) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
this.splittingExecutorService = Executors.newSingleThreadExecutor(threadFactory);
}
splittingExecutorService.submit(this::splitChunksForRemainingTables);
}
}
private void splitTable(TableId nextTable) {
LOG.info("Start splitting table {} into chunks...", nextTable);
long start = System.currentTimeMillis();
int chunkNum = 0;
boolean hasRecordSchema = false;
// split the given table into chunks (snapshot splits)
do {
synchronized (lock) {
Collection<SnapshotSplit> splits;
try {
splits = chunkSplitter.generateSplits(nextTable);
} catch (Exception e) {
throw new IllegalStateException(
"Error when splitting chunks for " + nextTable, e);
}
if (!hasRecordSchema && !splits.isEmpty()) {
hasRecordSchema = true;
tableSchemas.putAll(splits.iterator().next().getTableSchemas());
}
final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
splits.stream()
.map(SnapshotSplit::toSchemalessSnapshotSplit)
.collect(Collectors.toList());
chunkNum += splits.size();
remainingSplits.addAll(schemalessSnapshotSplits);
tableSchemas.putAll(tableSchema);
if (!alreadyProcessedTables.contains(nextTable)) {
enumeratorMetrics.startSnapshotTables(1);
}
alreadyProcessedTables.add(nextTable);
List<String> splitIds =
schemalessSnapshotSplits.stream()
.map(SchemalessSnapshotSplit::splitId)
.collect(Collectors.toList());
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
if (!chunkSplitter.hasNextChunk()) {
remainingTables.remove(nextTable);
}
lock.notify();
}
} while (chunkSplitter.hasNextChunk());
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
nextTable,
chunkNum,
end - start);
}
@Override
public Optional<SourceSplitBase> getNext() {
synchronized (lock) {
checkSplitterErrors();
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
Iterator<SchemalessSnapshotSplit> iterator = remainingSplits.iterator();
SchemalessSnapshotSplit split = iterator.next();
iterator.remove();
assignedSplits.put(split.splitId(), split);
addAlreadyProcessedTablesIfNotExists(split.getTableId());
enumeratorMetrics
.getTableMetrics(split.getTableId())
.finishProcessSplit(split.splitId());
return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
} else if (!remainingTables.isEmpty()) {
try {
// wait for the asynchronous split to complete
lock.wait();
} catch (InterruptedException e) {
throw new FlinkRuntimeException(
"InterruptedException while waiting for asynchronously snapshot split");
}
return getNext();
} else {
closeExecutorService();
return Optional.empty();
}
}
@ -462,7 +522,8 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
remainingTables,
isTableIdCaseSensitive,
true,
splitFinishedCheckpointIds);
splitFinishedCheckpointIds,
chunkSplitter.snapshotState(checkpointId));
// we need a complete checkpoint before mark this assigner to be finished, to wait for all
// records of snapshot splits are completely processed
if (checkpointIdToFinish == null
@ -510,9 +571,30 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
@Override
public void close() throws IOException {
closeExecutorService();
if (chunkSplitter != null) {
try {
chunkSplitter.close();
} catch (Exception e) {
LOG.warn("Fail to close the chunk splitter.");
}
}
dialect.close();
}
private void closeExecutorService() {
if (splittingExecutorService != null) {
splittingExecutorService.shutdown();
}
}
private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
if (!alreadyProcessedTables.contains(tableId)) {
alreadyProcessedTables.add(tableId);
enumeratorMetrics.startSnapshotTables(1);
}
}
@Override
public boolean noMoreSplits() {
return remainingTables.isEmpty() && remainingSplits.isEmpty();
@ -567,4 +649,53 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig
private boolean allSnapshotSplitsFinished() {
return noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size();
}
private void splitChunksForRemainingTables() {
try {
// restore from a checkpoint and start to split the table from the previous
// checkpoint
if (chunkSplitter.hasNextChunk()) {
LOG.info(
"Start splitting remaining chunks for table {}",
chunkSplitter.getCurrentSplittingTableId());
splitTable(chunkSplitter.getCurrentSplittingTableId());
}
// split the remaining tables
for (TableId nextTable : remainingTables) {
splitTable(nextTable);
}
} catch (Throwable e) {
synchronized (lock) {
if (uncaughtSplitterException == null) {
uncaughtSplitterException = e;
} else {
uncaughtSplitterException.addSuppressed(e);
}
// Release the potential waiting getNext() call
lock.notify();
}
}
}
private void checkSplitterErrors() {
if (uncaughtSplitterException != null) {
throw new FlinkRuntimeException(
"Chunk splitting has encountered exception", uncaughtSplitterException);
}
}
private static ChunkSplitter createChunkSplitter(
SourceConfig sourceConfig,
DataSourceDialect dataSourceDialect,
ChunkSplitterState chunkSplitterState) {
TableId tableId = chunkSplitterState.getCurrentSplittingTableId();
return dataSourceDialect.createChunkSplitter(
sourceConfig,
!ChunkSplitterState.NO_SPLITTING_TABLE_STATE.equals(chunkSplitterState)
&& tableId != null
&& dataSourceDialect.isIncludeDataCollection(sourceConfig, tableId)
? chunkSplitterState
: ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
}

@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import io.debezium.relational.TableId;
@ -28,6 +29,36 @@ import java.util.Collection;
@Experimental
public interface ChunkSplitter {
/**
* Called to open the chunk splitter to acquire any resources, like threads or jdbc connections.
*/
void open();
/** Generates all snapshot splits (chunks) for the give data collection. */
Collection<SnapshotSplit> generateSplits(TableId tableId);
Collection<SnapshotSplit> generateSplits(TableId tableId) throws Exception;
/** Get whether the splitter has more chunks for current table. */
boolean hasNextChunk();
/**
* Creates a snapshot of the state of this chunk splitter, to be stored in a checkpoint.
*
* <p>This method takes the ID of the checkpoint for which the state is snapshotted. Most
* implementations should be able to ignore this parameter, because for the contents of the
* snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be
* interesting for source connectors with external systems where those systems are themselves
* aware of checkpoints; for example in cases where the enumerator notifies that system about a
* specific checkpoint being triggered.
*
* @param checkpointId The ID of the checkpoint for which the snapshot is created.
* @return an object containing the state of the split enumerator.
*/
ChunkSplitterState snapshotState(long checkpointId);
TableId getCurrentSplittingTableId();
/**
* Called to open the chunk splitter to release any resources, like threads or jdbc connections.
*/
void close() throws Exception;
}

@ -20,13 +20,14 @@ package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
@ -47,9 +48,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@ -60,55 +61,101 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dialect;
public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
private final Object lock = new Object();
@Nullable private TableId currentSplittingTableId;
@Nullable private ChunkSplitterState.ChunkBound nextChunkStart;
@Nullable private Integer nextChunkId;
private JdbcConnection jdbcConnection;
private Table currentSplittingTable;
private TableChanges.TableChange currentSchema;
private Column splitColumn;
private RowType splitType;
private Object[] minMaxOfSplitColumn;
private long approximateRowCnt;
public JdbcSourceChunkSplitter(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect,
ChunkSplitterState chunkSplitterState) {
this(
sourceConfig,
dialect,
chunkSplitterState.getCurrentSplittingTableId(),
chunkSplitterState.getNextChunkStart(),
chunkSplitterState.getNextChunkId());
}
public JdbcSourceChunkSplitter(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect,
@Nullable TableId currentSplittingTableId,
@Nullable ChunkSplitterState.ChunkBound nextChunkStart,
@Nullable Integer nextChunkId) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
this.currentSplittingTableId = currentSplittingTableId;
this.nextChunkStart = nextChunkStart;
this.nextChunkId = nextChunkId;
}
@Override
public void open() {
this.jdbcConnection = dialect.openJdbcConnection(sourceConfig);
}
/** Generates all snapshot splits (chunks) for the give table path. */
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
public Collection<SnapshotSplit> generateSplits(TableId tableId) throws Exception {
if (!hasNextChunk()) {
// split a new table.
analyzeTable(tableId);
Optional<List<SnapshotSplit>> evenlySplitChunks = trySplitAllEvenlySizedChunks(tableId);
if (evenlySplitChunks.isPresent()) {
return evenlySplitChunks.get();
} else {
synchronized (lock) {
this.currentSplittingTableId = tableId;
this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
this.nextChunkId = 0;
return Collections.singletonList(splitOneUnevenlySizedChunk(tableId));
}
}
} else {
Preconditions.checkState(
currentSplittingTableId.equals(tableId),
"Can not split a new table before the previous table splitting finish.");
if (currentSplittingTable == null) {
analyzeTable(currentSplittingTableId);
}
synchronized (lock) {
return Collections.singletonList(splitOneUnevenlySizedChunk(tableId));
}
}
}
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
@Override
public boolean hasNextChunk() {
return currentSplittingTableId != null;
}
Table table =
Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable();
Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
@Override
public ChunkSplitterState snapshotState(long checkpointId) {
// don't chunk new spilt when snapshot state.
synchronized (lock) {
return new ChunkSplitterState(currentSplittingTableId, nextChunkStart, nextChunkId);
}
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
@Override
public TableId getCurrentSplittingTableId() {
return currentSplittingTableId;
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
@Override
public void close() throws Exception {
if (jdbcConnection != null) {
jdbcConnection.close();
}
}
@ -273,44 +320,118 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
}
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
* Try to split all chunks for evenly-sized table, or else return empty.
*
* <p>We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks,
* using evenly-sized chunks which is much efficient, using unevenly-sized chunks which will
* request many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
private Optional<List<SnapshotSplit>> trySplitAllEvenlySizedChunks(TableId tableId) {
LOG.debug("Try evenly splitting table {} into chunks", tableId);
final Object min = minMaxOfSplitColumn[0];
final Object max = minMaxOfSplitColumn[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
return Optional.of(
createSnapshotSplit(tableId, Collections.singletonList(ChunkRange.all())));
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
final int dynamicChunkSize =
getDynamicChunkSize(tableId, splitColumn, min, max, chunkSize, approximateRowCnt);
if (dynamicChunkSize != -1) {
LOG.debug("finish evenly splitting table {} into chunks", tableId);
List<ChunkRange> chunks =
splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
return Optional.of(createSnapshotSplit(tableId, chunks));
} else {
LOG.debug("beginning unevenly splitting table {} into chunks", tableId);
return Optional.empty();
}
}
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
/** Analyze the meta information for given table. */
private void analyzeTable(TableId tableId) {
try {
currentSchema = dialect.queryTableSchema(jdbcConnection, tableId);
currentSplittingTable = Objects.requireNonNull(currentSchema).getTable();
splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumn());
splitType = getSplitType(splitColumn);
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn);
approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId);
} catch (Exception e) {
throw new RuntimeException("Fail to analyze table in chunk splitter.", e);
}
}
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
/** Generates one snapshot split (chunk) for the give table path. */
private SnapshotSplit splitOneUnevenlySizedChunk(TableId tableId) throws SQLException {
final int chunkSize = sourceConfig.getSplitSize();
final Object chunkStartVal = nextChunkStart.getValue();
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {} from {}",
tableId,
chunkSize,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? "null"
: chunkStartVal.toString());
// we start from [null, min + chunk_size) and avoid [null, min)
Object chunkEnd =
nextChunkEnd(
jdbcConnection,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? minMaxOfSplitColumn[0]
: chunkStartVal,
tableId,
splitColumn,
minMaxOfSplitColumn[1],
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && isChunkEndLeMax(chunkEnd, minMaxOfSplitColumn[1], splitColumn)) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(tableId, nextChunkId++, splitType, chunkStartVal, chunkEnd);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
currentSplittingTableId = null;
nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
return createSnapshotSplit(tableId, nextChunkId++, splitType, chunkStartVal, null);
}
}
/**
* Checks whether split column is evenly distributed across its range and return the
* dynamicChunkSize. If the split column is not evenly distributed, return -1.
*/
private int getDynamicChunkSize(
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize,
long approximateRowCnt) {
if (!isEvenlySplitColumn(splitColumn)) {
return -1;
}
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& ObjectUtils.doubleCompare(distributionFactor, distributionFactorUpper)
<= 0;
LOG.info(
"The actual distribution factor for table {} is {}, the lower bound of evenly distribution factor is {}, the upper bound of evenly distribution factor is {}",
tableId,
distributionFactor,
distributionFactorLower,
distributionFactorUpper);
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
return Math.max((int) (distributionFactor * chunkSize), 1);
}
return -1;
}
/**
@ -404,8 +525,21 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
}
}
/** Generates all snapshot splits (chunks) from chunk ranges. */
private List<SnapshotSplit> createSnapshotSplit(TableId tableId, List<ChunkRange> chunks) {
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd());
splits.add(split);
}
return splits;
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
@ -415,7 +549,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
schema.put(tableId, currentSchema);
return new SnapshotSplit(
tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema);
}

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.assigner.state;
import io.debezium.relational.TableId;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* The state of the {@link
* org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter}.
*/
public class ChunkSplitterState {
public static final ChunkSplitterState NO_SPLITTING_TABLE_STATE =
new ChunkSplitterState(null, null, null);
/** Record current splitting table id in the chunk splitter. */
@Nullable private final TableId currentSplittingTableId;
/** Record next chunk start. */
@Nullable private final ChunkBound nextChunkStart;
/** Record next chunk id. */
@Nullable private final Integer nextChunkId;
public ChunkSplitterState(
@Nullable TableId currentSplittingTableId,
@Nullable ChunkBound nextChunkStart,
@Nullable Integer nextChunkId) {
this.currentSplittingTableId = currentSplittingTableId;
this.nextChunkStart = nextChunkStart;
this.nextChunkId = nextChunkId;
}
@Nullable
public TableId getCurrentSplittingTableId() {
return currentSplittingTableId;
}
@Nullable
public ChunkBound getNextChunkStart() {
return nextChunkStart;
}
@Nullable
public Integer getNextChunkId() {
return nextChunkId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ChunkSplitterState)) {
return false;
}
ChunkSplitterState that = (ChunkSplitterState) o;
return Objects.equals(currentSplittingTableId, that.currentSplittingTableId)
&& Objects.equals(nextChunkStart, that.nextChunkStart)
&& Objects.equals(nextChunkId, that.nextChunkId);
}
@Override
public int hashCode() {
return Objects.hash(currentSplittingTableId, nextChunkStart, nextChunkId);
}
@Override
public String toString() {
return "ChunkSplitterState{"
+ "currentSplittingTableId="
+ (currentSplittingTableId == null ? "null" : currentSplittingTableId)
+ ", nextChunkStart="
+ (nextChunkStart == null ? "null" : nextChunkStart)
+ ", nextChunkId="
+ (nextChunkId == null ? "null" : String.valueOf(nextChunkId))
+ '}';
}
/** The definition of the chunk bound. */
public static final class ChunkBound {
public static final ChunkBound START_BOUND = new ChunkBound(ChunkBoundType.START, null);
public static final ChunkBound END_BOUND = new ChunkBound(ChunkBoundType.END, null);
private final ChunkBoundType boundType;
@Nullable private final Object value;
public ChunkBound(ChunkBoundType boundType, @Nullable Object value) {
this.boundType = boundType;
this.value = value;
}
public ChunkBoundType getBoundType() {
return boundType;
}
@Nullable
public Object getValue() {
return value;
}
public static ChunkBound middleOf(Object obj) {
return new ChunkBound(ChunkBoundType.MIDDLE, obj);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ChunkBound)) {
return false;
}
ChunkBound that = (ChunkBound) o;
return boundType == that.boundType && Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(boundType, value);
}
@Override
public String toString() {
return "ChunkBound{"
+ "boundType="
+ boundType
+ ", value="
+ (value == null ? "null" : value.toString())
+ '}';
}
}
/** The type of the chunk bound. */
public enum ChunkBoundType {
START,
MIDDLE,
END
}
}

@ -50,10 +50,13 @@ import static org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplit
*
* <p>2. Add streamSplitTaskId(int) to HybridPendingSplitsState, which represents the task ID
* assigned to the stream split.
*
* <p>The modification of 8th version: add ChunkSplitterState to SnapshotPendingSplitsState, which
* contains the asynchronously splitting chunk info.
*/
public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> {
private static final int VERSION = 7;
private static final int VERSION = 8;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@ -119,6 +122,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
case 5:
case 6:
case 7:
case 8:
return deserializePendingSplitsState(version, serialized);
default:
throw new IOException("Unknown version: " + version);
@ -175,6 +179,20 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
writeTableSchemas(state.getTableSchemas(), out);
writeSplitFinishedCheckpointIds(state.getSplitFinishedCheckpointIds(), out);
// The modification of 8th version: add ChunkSplitterState to SnapshotPendingSplitsState,
// which contains the asynchronously splitting chunk info.
boolean hasTableIsSplitting =
state.getChunkSplitterState().getCurrentSplittingTableId() != null;
out.writeBoolean(hasTableIsSplitting);
if (hasTableIsSplitting) {
ChunkSplitterState chunkSplitterState = state.getChunkSplitterState();
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString());
out.writeUTF(
SerializerUtils.rowToSerializedString(
new Object[] {chunkSplitterState.getNextChunkStart().getValue()}));
out.writeInt(chunkSplitterState.getNextChunkId());
}
}
private void serializeHybridPendingSplitsState(
@ -234,7 +252,8 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
new ArrayList<>(),
false,
false,
new HashMap<>());
new HashMap<>(),
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState(
@ -289,6 +308,23 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
if (version >= 7) {
splitFinishedCheckpointIds = readSplitFinishedCheckpointIds(in);
}
// The modification of 8th version: add ChunkSplitterState to SnapshotPendingSplitsState,
// which contains the asynchronously splitting chunk info.
ChunkSplitterState chunkSplitterState = ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
if (version >= 8) {
boolean hasTableIsSplitting = in.readBoolean();
if (hasTableIsSplitting) {
TableId splittingTableId = TableId.parse(in.readUTF());
Object nextChunkStart = SerializerUtils.serializedStringToRow(in.readUTF())[0];
Integer nextChunkId = in.readInt();
chunkSplitterState =
new ChunkSplitterState(
splittingTableId,
ChunkSplitterState.ChunkBound.middleOf(nextChunkStart),
nextChunkId);
}
}
return new SnapshotPendingSplitsState(
alreadyProcessedTables,
remainingSchemalessSplits,
@ -299,7 +335,8 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
remainingTableIds,
isTableIdCaseSensitive,
true,
splitFinishedCheckpointIds);
splitFinishedCheckpointIds,
chunkSplitterState);
}
private HybridPendingSplitsState deserializeHybridPendingSplitsState(

@ -71,6 +71,12 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
/** Map to record splitId and the checkpointId mark the split is finished. */
private final Map<String, Long> splitFinishedCheckpointIds;
/**
* The data structure to record the state of a {@link
* org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter}.
*/
private final ChunkSplitterState chunkSplitterState;
public SnapshotPendingSplitsState(
List<TableId> alreadyProcessedTables,
List<SchemalessSnapshotSplit> remainingSplits,
@ -81,7 +87,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed,
Map<String, Long> splitFinishedCheckpointIds) {
Map<String, Long> splitFinishedCheckpointIds,
ChunkSplitterState chunkSplitterState) {
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
@ -91,6 +98,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.tableSchemas = tableSchemas;
this.chunkSplitterState = chunkSplitterState;
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
}
@ -134,6 +142,10 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
return isRemainingTablesCheckpointed;
}
public ChunkSplitterState getChunkSplitterState() {
return chunkSplitterState;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -151,7 +163,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
&& Objects.equals(remainingSplits, that.remainingSplits)
&& Objects.equals(assignedSplits, that.assignedSplits)
&& Objects.equals(splitFinishedOffsets, that.splitFinishedOffsets)
&& Objects.equals(splitFinishedCheckpointIds, that.splitFinishedCheckpointIds);
&& Objects.equals(splitFinishedCheckpointIds, that.splitFinishedCheckpointIds)
&& Objects.equals(chunkSplitterState, that.chunkSplitterState);
}
@Override
@ -165,7 +178,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
assignerStatus,
isTableIdCaseSensitive,
isRemainingTablesCheckpointed,
splitFinishedCheckpointIds);
splitFinishedCheckpointIds,
chunkSplitterState);
}
@Override
@ -189,6 +203,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
+ isRemainingTablesCheckpointed
+ ", splitFinishedCheckpointIds="
+ splitFinishedCheckpointIds
+ ", chunkSplitterState="
+ chunkSplitterState
+ '}';
}
}

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2TypeUtils;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.table.types.DataType;
@ -37,8 +38,11 @@ import java.sql.SQLException;
@Internal
public class Db2ChunkSplitter extends JdbcSourceChunkSplitter {
public Db2ChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
super(sourceConfig, dialect);
public Db2ChunkSplitter(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect,
ChunkSplitterState chunkSplitterState) {
super(sourceConfig, dialect, chunkSplitterState);
}
@Override

@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@ -86,7 +87,14 @@ public class Db2Dialect implements JdbcDataSourceDialect {
@Override
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new Db2ChunkSplitter(sourceConfig, this);
return new Db2ChunkSplitter(
sourceConfig, this, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
@Override
public ChunkSplitter createChunkSplitter(
JdbcSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
return new Db2ChunkSplitter(sourceConfig, this, chunkSplitterState);
}
@Override

@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
@ -44,4 +45,26 @@ public class MongoDBChunkSplitter implements ChunkSplitter {
}
return SplitVectorSplitStrategy.INSTANCE.split(splitContext);
}
@Override
public void open() {}
@Override
public boolean hasNextChunk() {
// mongo cdc doesn't support chunk split asynchronously now.
return false;
}
@Override
public ChunkSplitterState snapshotState(long checkpointId) {
return ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
}
@Override
public TableId getCurrentSplittingTableId() {
return null;
}
@Override
public void close() throws Exception {}
}

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mongodb.source.dialect;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.MongoDBChunkSplitter;
@ -169,11 +170,18 @@ public class MongoDBDialect implements DataSourceDialect<MongoDBSourceConfig> {
return true;
}
@Deprecated
@Override
public ChunkSplitter createChunkSplitter(MongoDBSourceConfig sourceConfig) {
return new MongoDBChunkSplitter(sourceConfig);
}
@Override
public ChunkSplitter createChunkSplitter(
MongoDBSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
return createChunkSplitter(sourceConfig);
}
@Override
public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
if (sourceSplitBase.isSnapshotSplit()) {

@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@ -89,7 +90,14 @@ public class OracleDialect implements JdbcDataSourceDialect {
@Override
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new OracleChunkSplitter(sourceConfig, this);
return new OracleChunkSplitter(
sourceConfig, this, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
@Override
public ChunkSplitter createChunkSplitter(
JdbcSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
return new OracleChunkSplitter(sourceConfig, this, chunkSplitterState);
}
@Override

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleTypeUtils;
@ -44,8 +45,11 @@ import java.sql.SQLException;
@Internal
public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
public OracleChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
super(sourceConfig, dialect);
public OracleChunkSplitter(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect,
ChunkSplitterState chunkSplitterState) {
super(sourceConfig, dialect, chunkSplitterState);
}
@Override

@ -891,8 +891,10 @@ public class NewlyAddedTableITCase extends OracleSourceTestBase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
ORACLE_CONTAINER.getUsername(),
ORACLE_CONTAINER.getPassword(),
// To analyze table for approximate rowCnt computation, use admin user before chunk
// splitting.
TOP_USER,
TOP_SECRET,
ORACLE_DATABASE,
ORACLE_SCHEMA,
getTableNameRegex(captureTableNames),

@ -434,8 +434,10 @@ public class OracleSourceITCase extends OracleSourceTestBase {
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.username(CONNECTOR_USER)
.password(CONNECTOR_PWD)
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
.username(TOP_USER)
.password(TOP_SECRET)
.databaseList(ORACLE_DATABASE)
.schemaList(ORACLE_SCHEMA)
.tableList("DEBEZIUM.CUSTOMERS")
@ -559,8 +561,10 @@ public class OracleSourceITCase extends OracleSourceTestBase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
ORACLE_CONTAINER.getUsername(),
ORACLE_CONTAINER.getPassword(),
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
TOP_USER,
TOP_SECRET,
ORACLE_DATABASE,
ORACLE_SCHEMA,
getTableNameRegex(captureCustomerTables), // (customer|customer_1)

@ -70,6 +70,7 @@ public class OracleSourceTestBase extends TestLogger {
public static final String CONNECTOR_PWD = "dbz";
public static final String TEST_USER = "debezium";
public static final String TEST_PWD = "dbz";
public static final String TOP_USER = "sys as sysdba";
public static final String TOP_SECRET = "top_secret";
public static final OracleContainer ORACLE_CONTAINER =
@ -134,8 +135,7 @@ public class OracleSourceTestBase extends TestLogger {
}
public static Connection getJdbcConnectionAsDBA() throws SQLException {
return DriverManager.getConnection(
ORACLE_CONTAINER.getJdbcUrl(), "sys as sysdba", TOP_SECRET);
return DriverManager.getConnection(ORACLE_CONTAINER.getJdbcUrl(), TOP_USER, TOP_SECRET);
}
public static void createAndInitialize(String sqlFile) throws Exception {

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.oracle.table;
import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
@ -139,8 +140,10 @@ public class OracleConnectorITCase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
CONNECTOR_USER,
CONNECTOR_PWD,
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
parallelismSnapshot ? OracleSourceTestBase.TOP_USER : CONNECTOR_USER,
parallelismSnapshot ? OracleSourceTestBase.TOP_SECRET : CONNECTOR_PWD,
parallelismSnapshot,
"debezium",
"products");
@ -274,8 +277,10 @@ public class OracleConnectorITCase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
CONNECTOR_USER,
CONNECTOR_PWD,
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
parallelismSnapshot ? OracleSourceTestBase.TOP_USER : CONNECTOR_USER,
parallelismSnapshot ? OracleSourceTestBase.TOP_SECRET : CONNECTOR_PWD,
parallelismSnapshot,
"debezium",
"(products|products_nested_table)");
@ -484,8 +489,10 @@ public class OracleConnectorITCase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
parallelismSnapshot ? OracleSourceTestBase.TOP_USER : "dbzuser",
parallelismSnapshot ? OracleSourceTestBase.TOP_SECRET : "dbz",
parallelismSnapshot,
"debezium",
"products");
@ -679,8 +686,10 @@ public class OracleConnectorITCase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
parallelismSnapshot ? OracleSourceTestBase.TOP_USER : "dbzuser",
parallelismSnapshot ? OracleSourceTestBase.TOP_SECRET : "dbz",
parallelismSnapshot,
"debezium",
"test_numeric_table");
@ -791,8 +800,10 @@ public class OracleConnectorITCase {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
CONNECTOR_USER,
CONNECTOR_PWD,
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
parallelismSnapshot ? OracleSourceTestBase.TOP_USER : CONNECTOR_USER,
parallelismSnapshot ? OracleSourceTestBase.TOP_SECRET : CONNECTOR_PWD,
parallelismSnapshot,
"debezium",
"full_types");

@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.postgres.source;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
import org.apache.flink.table.types.DataType;
@ -36,8 +38,11 @@ import java.sql.SQLException;
@Internal
public class PostgresChunkSplitter extends JdbcSourceChunkSplitter {
public PostgresChunkSplitter(JdbcSourceConfig sourceConfig, PostgresDialect postgresDialect) {
super(sourceConfig, postgresDialect);
public PostgresChunkSplitter(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect,
ChunkSplitterState chunkSplitterState) {
super(sourceConfig, dialect, chunkSplitterState);
}
@Override

@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@ -150,7 +151,14 @@ public class PostgresDialect implements JdbcDataSourceDialect {
@Override
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new PostgresChunkSplitter(sourceConfig, this);
return new PostgresChunkSplitter(
sourceConfig, this, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
@Override
public ChunkSplitter createChunkSplitter(
JdbcSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
return new PostgresChunkSplitter(sourceConfig, this, chunkSplitterState);
}
@Override

@ -17,11 +17,14 @@
package org.apache.flink.cdc.connectors.postgres.source.fetch;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
@ -31,8 +34,10 @@ import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
@ -43,16 +48,16 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Tests for {@link PostgresScanFetchTask}. */
public class PostgresScanFetchTaskTest extends PostgresTestBase {
protected static final int DEFAULT_PARALLELISM = 4;
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
@ -237,7 +242,7 @@ public class PostgresScanFetchTaskTest extends PostgresTestBase {
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);
try (PostgresConnection postgresConnection = postgresDialect.openJdbcConnection()) {
SnapshotPhaseHook snapshotPhaseHook =
(postgresSourceConfig, split) -> {
@ -262,7 +267,6 @@ public class PostgresScanFetchTaskTest extends PostgresTestBase {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);
PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
@ -314,15 +318,30 @@ public class PostgresScanFetchTaskTest extends PostgresTestBase {
}
private List<SnapshotSplit> getSnapshotSplits(
PostgresSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect) {
PostgresSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect)
throws Exception {
List<TableId> discoverTables = sourceDialect.discoverDataCollections(sourceConfig);
final ChunkSplitter chunkSplitter = sourceDialect.createChunkSplitter(sourceConfig);
OffsetFactory offsetFactory = new PostgresOffsetFactory();
final SnapshotSplitAssigner snapshotSplitAssigner =
new SnapshotSplitAssigner<JdbcSourceConfig>(
sourceConfig,
DEFAULT_PARALLELISM,
discoverTables,
sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig),
sourceDialect,
offsetFactory);
snapshotSplitAssigner.initEnumeratorMetrics(
new SourceEnumeratorMetrics(
UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup()));
snapshotSplitAssigner.open();
List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
for (TableId table : discoverTables) {
Collection<SnapshotSplit> snapshotSplits = chunkSplitter.generateSplits(table);
snapshotSplitList.addAll(snapshotSplits);
Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();
while (split.isPresent()) {
snapshotSplitList.add(split.get().asSnapshotSplit());
split = snapshotSplitAssigner.getNext();
}
snapshotSplitAssigner.close();
return snapshotSplitList;
}
}

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerUtils;
import org.apache.flink.table.types.DataType;
@ -38,8 +39,11 @@ import java.sql.SQLException;
@Internal
public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter {
public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
super(sourceConfig, dialect);
public SqlServerChunkSplitter(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect,
ChunkSplitterState chunkSplitterState) {
super(sourceConfig, dialect, chunkSplitterState);
}
@Override

@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@ -85,7 +86,14 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
@Override
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new SqlServerChunkSplitter(sourceConfig, this);
return new SqlServerChunkSplitter(
sourceConfig, this, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
}
@Override
public ChunkSplitter createChunkSplitter(
JdbcSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
return new SqlServerChunkSplitter(sourceConfig, this, chunkSplitterState);
}
@Override

@ -17,11 +17,14 @@
package org.apache.flink.cdc.connectors.sqlserver.source.read.fetch;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
@ -30,9 +33,11 @@ import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceTestBase;
import org.apache.flink.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.flink.cdc.connectors.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.flink.cdc.connectors.sqlserver.source.dialect.SqlServerDialect;
import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnFactory;
import org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask;
import org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@ -45,10 +50,9 @@ import org.junit.Test;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Optional;
import static org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static org.junit.Assert.assertEquals;
@ -318,19 +322,29 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
}
private List<SnapshotSplit> getSnapshotSplits(
SqlServerSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect) {
String databaseName = sourceConfig.getDatabaseList().get(0);
List<TableId> tableIdList =
sourceConfig.getTableList().stream()
.map(tableId -> TableId.parse(databaseName + "." + tableId))
.collect(Collectors.toList());
final ChunkSplitter chunkSplitter = sourceDialect.createChunkSplitter(sourceConfig);
SqlServerSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect)
throws Exception {
List<TableId> discoverTables = sourceDialect.discoverDataCollections(sourceConfig);
OffsetFactory offsetFactory = new LsnFactory();
final SnapshotSplitAssigner snapshotSplitAssigner =
new SnapshotSplitAssigner<JdbcSourceConfig>(
sourceConfig,
DEFAULT_PARALLELISM,
discoverTables,
sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig),
sourceDialect,
offsetFactory);
snapshotSplitAssigner.initEnumeratorMetrics(
new SourceEnumeratorMetrics(
UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup()));
snapshotSplitAssigner.open();
List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
for (TableId table : tableIdList) {
Collection<SnapshotSplit> snapshotSplits = chunkSplitter.generateSplits(table);
snapshotSplitList.addAll(snapshotSplits);
Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();
while (split.isPresent()) {
snapshotSplitList.add(split.get().asSnapshotSplit());
split = snapshotSplitAssigner.getNext();
}
snapshotSplitAssigner.close();
return snapshotSplitList;
}

@ -61,6 +61,8 @@ import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_DATABASE;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_PWD;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_USER;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TOP_SECRET;
import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TOP_USER;
import static org.junit.Assert.assertNotNull;
/** End-to-end tests for oracle-cdc connector uber jar. */
@ -130,8 +132,10 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
" 'connector' = 'oracle-cdc',",
" 'hostname' = '" + oracle.getNetworkAliases().get(0) + "',",
" 'port' = '" + oracle.getExposedPorts().get(0) + "',",
" 'username' = '" + CONNECTOR_USER + "',",
" 'password' = '" + CONNECTOR_PWD + "',",
// To analyze table for approximate rowCnt computation, use admin user
// before chunk splitting.
" 'username' = '" + TOP_USER + "',",
" 'password' = '" + TOP_SECRET + "',",
" 'database-name' = 'ORCLCDB',",
" 'schema-name' = 'DEBEZIUM',",
" 'scan.incremental.snapshot.enabled' = 'true',",

Loading…
Cancel
Save