[FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter (#3319)

pull/3517/head
Hongshun Wang 6 months ago committed by GitHub
parent 8f2939e913
commit 47f5660055
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -18,70 +18,119 @@
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
@Experimental
public interface JdbcSourceChunkSplitter extends ChunkSplitter {
public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class);
protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dialect;
public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
}
/** Generates all snapshot splits (chunks) for the give table path. */
@Override
Collection<SnapshotSplit> generateSplits(TableId tableId);
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
/**
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param column column.
* @return maximum and minimum value.
*/
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException;
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
/**
* Query the minimum value of the column in the table, and the minimum value must greater than
* the excludedLowerBound value. e.g. prepare query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param column column.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
Object queryMin(JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException;
Table table =
Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable();
Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
}
}
/**
* Query the maximum value of the next chunk, and the next chunk must be greater than or equal
* to <code>includedLowerBound</code> value [min_1, max_1), [min_2, max_2),... [min_n, null).
* Each time this method is called it will return max1, max2...
*
* <p>Each database has different grammar to get limit number of data, for example, `limit N` in
* mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` in DB2.
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param column column.
* @param splitColumn column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
*/
Object queryNextChunkMax(
protected abstract Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException;
@ -89,23 +138,16 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
/**
* Approximate total number of entries in the lookup table.
*
* <p>Each database has different system table to lookup up approximate total number. For
* example, `pg_class` in postgres, `sys.dm_db_partition_stats` in sqlserver, `SYSCAT.TABLE` in
* db2.
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @return approximate row count.
*/
Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException;
/**
* Build the scan query sql of the {@link SnapshotSplit}.
*
* @param tableId table identity.
* @param splitKeyType primary key type.
* @param isFirstSplit whether the first split.
* @param isLastSplit whether the last split.
* @return query sql.
*/
String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit);
protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException;
/**
* Checks whether split column is evenly distributed across its range.
@ -113,7 +155,7 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
* @param splitColumn split column.
* @return true that means split column with type BIGINT, INT, DECIMAL.
*/
default boolean isEvenlySplitColumn(Column splitColumn) {
protected boolean isEvenlySplitColumn(Column splitColumn) {
DataType flinkType = fromDbzColumn(splitColumn);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
@ -130,7 +172,94 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
* @param splitColumn dbz split column.
* @return flink data type
*/
DataType fromDbzColumn(Column splitColumn);
protected abstract DataType fromDbzColumn(Column splitColumn);
/** Returns the distribution factor of the given table. */
protected double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
}
/**
* Get the column which is seen as chunk key.
*
* @param table table identity.
* @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
* primary key instead. @Column the column which is seen as chunk key.
*/
protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
}
/** ChunkEnd less than or equal to max. */
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}
/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}
/**
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param splitColumn column.
* @return maximum and minimum value.
*/
protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn)
throws SQLException {
return JdbcChunkUtils.queryMinMax(
jdbc,
jdbc.quotedTableIdString(tableId),
jdbc.quotedColumnIdString(splitColumn.name()));
}
/**
* Query the minimum value of the column in the table, and the minimum value must greater than
* the excludedLowerBound value. e.g. prepare query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param splitColumn column.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
protected Object queryMin(
JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound)
throws SQLException {
return JdbcChunkUtils.queryMin(
jdbc,
jdbc.quotedColumnIdString(splitColumn.name()),
jdbc.quotedTableIdString(tableId),
excludedLowerBound);
}
/**
* convert dbz column to Flink row type.
@ -138,8 +267,178 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
* @param splitColumn split column.
* @return flink row type.
*/
default RowType getSplitType(Column splitColumn) {
private RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType();
}
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;
}
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
return null;
} else {
return chunkEnd;
}
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}
private String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
private void maySleep(int count, TableId tableId) {
// every 10 queries to sleep 0.1s
if (count % 10 == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}
}

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base.source.utils;
import org.apache.flink.table.api.ValidationException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import javax.annotation.Nullable;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
/** Utilities to split chunks of table. */
public class JdbcChunkUtils {
/**
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param quotedTableName table identity.
* @param quotedColumnName column name.
* @return maximum and minimum value.
*/
public static Object[] queryMinMax(
JdbcConnection jdbc, String quotedTableName, String quotedColumnName)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
quotedColumnName, quotedColumnName, quotedTableName);
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]",
minMaxQuery));
}
return rowToArray(rs, 2);
});
}
/**
* Query the minimum value of the column in the table, and the minimum value must greater than
* the excludedLowerBound value. e.g. prepare query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param quotedTableName table identity.
* @param quotedColumnName column name.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
public static Object queryMin(
JdbcConnection jdbc,
String quotedTableName,
String quotedColumnName,
Object excludedLowerBound)
throws SQLException {
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
quotedColumnName, quotedTableName, quotedColumnName);
return jdbc.prepareQueryAndMap(
minQuery,
ps -> ps.setObject(1, excludedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]", minQuery));
}
return rs.getObject(1);
});
}
/**
* Get the column which is seen as chunk key.
*
* @param table table identity.
* @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
* primary key instead. @Column the column which is seen as chunk key.
*/
public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}
if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
primaryKeys.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
return targetPkColumn.get();
}
throw new ValidationException(
String.format(
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkKeyColumn,
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
table.id()));
}
// use first field in primary key as the split key
return primaryKeys.get(0);
}
}

@ -17,353 +17,48 @@
package org.apache.flink.cdc.connectors.base.experimental;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlTypeUtils;
import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlUtils;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(MySqlChunkSplitter.class);
private final JdbcSourceConfig sourceConfig;
private final JdbcDataSourceDialect dialect;
@Internal
public class MySqlChunkSplitter extends JdbcSourceChunkSplitter {
public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
}
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = getSplitColumn(table);
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
}
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return MySqlUtils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return MySqlUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
super(sourceConfig, dialect);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return MySqlUtils.queryNextChunkMax(
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound);
}
@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
return MySqlUtils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
public String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
}
@Override
public DataType fromDbzColumn(Column splitColumn) {
protected DataType fromDbzColumn(Column splitColumn) {
return MySqlTypeUtils.fromDbzColumn(splitColumn);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;
}
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
} else {
return chunkEnd;
}
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}
// ------------------------------------------------------------------------------------------
/** Returns the distribution factor of the given table. */
private double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
}
private static String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
private static void maySleep(int count, TableId tableId) {
// every 10 queries to sleep 100ms
if (count % 10 == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}
public static Column getSplitColumn(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}
// use first field in primary key as the split key
return primaryKeys.get(0);
}
}

@ -45,7 +45,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@ -54,26 +53,6 @@ public class MySqlUtils {
private MySqlUtils() {}
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
quote(columnName), quote(columnName), quote(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]",
minMaxQuery));
}
return rowToArray(rs, 2);
});
}
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@ -94,27 +73,6 @@ public class MySqlUtils {
});
}
public static Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException {
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
quote(columnName), quote(tableId), quote(columnName));
return jdbc.prepareQueryAndMap(
minQuery,
ps -> ps.setObject(1, excludedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]", minQuery));
}
return rs.getObject(1);
});
}
public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
@ -122,7 +80,7 @@ public class MySqlUtils {
int chunkSize,
Object includedLowerBound)
throws SQLException {
String quotedColumn = quote(splitColumnName);
String quotedColumn = jdbc.quotedColumnIdString(splitColumnName);
String query =
String.format(
"SELECT MAX(%s) FROM ("
@ -130,7 +88,7 @@ public class MySqlUtils {
+ ") AS T",
quotedColumn,
quotedColumn,
quote(tableId),
jdbc.quotedTableIdString(tableId),
quotedColumn,
quotedColumn,
chunkSize);
@ -313,20 +271,6 @@ public class MySqlUtils {
.getLogicalType();
}
public static Column getSplitColumn(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}
// use first field in primary key as the split key
return primaryKeys.get(0);
}
public static String quote(String dbOrTableName) {
return "`" + dbOrTableName + "`";
}

@ -17,139 +17,37 @@
package org.apache.flink.cdc.connectors.db2.source.dialect;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2TypeUtils;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/**
* The splitter to split the table into chunks using primary-key (by default) or a given split key.
*/
public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(Db2ChunkSplitter.class);
private final JdbcSourceConfig sourceConfig;
private final JdbcDataSourceDialect dialect;
@Internal
public class Db2ChunkSplitter extends JdbcSourceChunkSplitter {
public Db2ChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
}
private static String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
private static void maySleep(int count, TableId tableId) {
// every 10 queries to sleep 0.1s
if (count % 10 == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = Db2Utils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
}
super(sourceConfig, dialect);
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return Db2Utils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return Db2Utils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}
@Override
public DataType fromDbzColumn(Column splitColumn) {
protected DataType fromDbzColumn(Column splitColumn) {
return Db2TypeUtils.fromDbzColumn(splitColumn);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@Override
public Object queryNextChunkMax(
protected Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
@ -161,199 +59,8 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
}
@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
return Db2Utils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
public String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return Db2Utils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
}
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;
}
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
// ------------------------------------------------------------------------------------------
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep awhile to avoid DDOS on Db2 server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
} else {
return chunkEnd;
}
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}
/** Returns the distribution factor of the given table. */
private double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt
.divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING)
.doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
return Db2Utils.queryApproximateRowCnt(jdbc, tableId);
}
}

@ -49,7 +49,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@ -60,26 +59,6 @@ public class Db2Utils {
public Db2Utils() {}
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
quote(columnName), quote(columnName), quoteSchemaAndTable(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]",
minMaxQuery));
}
return rowToArray(rs, 2);
});
}
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@ -103,27 +82,6 @@ public class Db2Utils {
});
}
public static Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException {
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
columnName, quote(tableId), columnName);
return jdbc.prepareQueryAndMap(
minQuery,
ps -> ps.setObject(1, excludedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]", minQuery));
}
return rs.getObject(1);
});
}
/**
* Returns the next LSN to be read from the database. This is the LSN of the last record that
* was read from the database.

@ -17,253 +17,92 @@
package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleTypeUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleUtils;
import org.apache.flink.cdc.connectors.oracle.util.ChunkUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import oracle.sql.ROWID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import java.sql.SQLException;
/**
* The {@code ChunkSplitter} used to split Oracle table into a set of chunks for JDBC data source.
*/
public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(OracleChunkSplitter.class);
private final JdbcSourceConfig sourceConfig;
private final JdbcDataSourceDialect dialect;
@Internal
public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
public OracleChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
}
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn =
ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
}
super(sourceConfig, dialect);
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn)
throws SQLException {
return OracleUtils.queryMinMax(jdbc, tableId, column.name());
// oracle query only use schema and table.
String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId);
return JdbcChunkUtils.queryMinMax(
jdbc, quoteSchemaAndTable, jdbc.quotedColumnIdString(splitColumn.name()));
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return OracleUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
// oracle query only use schema and table.
String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId);
return JdbcChunkUtils.queryMin(
jdbc, quoteSchemaAndTable, jdbc.quotedTableIdString(tableId), excludedLowerBound);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return OracleUtils.queryNextChunkMax(
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound);
}
@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
return OracleUtils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
public String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
}
@Override
public DataType fromDbzColumn(Column splitColumn) {
return OracleTypeUtils.fromDbzColumn(splitColumn);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
@Override
protected boolean isEvenlySplitColumn(Column splitColumn) {
// use ROWID get splitUnevenlySizedChunks by default
if (splitColumn.name().equals(ROWID.class.getSimpleName())) {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}",
tableId,
approximateRowCnt,
chunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
return false;
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, chunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
return super.isEvenlySplitColumn(splitColumn);
}
/** ChunkEnd less than or equal to max. */
private boolean isChunkEndLeMax(Object chunkEnd, Object max) {
@Override
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
@ -276,7 +115,8 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
}
/** ChunkEnd greater than or equal to max. */
private boolean isChunkEndGeMax(Object chunkEnd, Object max) {
@Override
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
@ -288,95 +128,9 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
return chunkEndMaxCompare;
}
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
return null;
} else {
return chunkEnd;
}
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}
// ------------------------------------------------------------------------------------------
/** Returns the distribution factor of the given table. */
private double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt
.divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING)
.doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
}
private static String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
private static void maySleep(int count, TableId tableId) {
// every 10 queries to sleep 0.1s
if (count % 10 == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
@Override
protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
// Use the ROWID column as the chunk key column by default for oracle cdc connector
return ChunkUtils.getChunkKeyColumn(table, chunkKeyColumn);
}
}

@ -42,33 +42,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
/** Utils to prepare Oracle SQL statement. */
public class OracleUtils {
private OracleUtils() {}
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
quote(columnName), quote(columnName), quoteSchemaAndTable(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]",
minMaxQuery));
}
return rowToArray(rs, 2);
});
}
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
final String analyzeTable =
@ -92,27 +70,6 @@ public class OracleUtils {
});
}
public static Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException {
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
quote(columnName), quoteSchemaAndTable(tableId), quote(columnName));
return jdbc.prepareQueryAndMap(
minQuery,
ps -> ps.setObject(1, excludedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]", minQuery));
}
return rs.getObject(1);
});
}
public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,

@ -17,138 +17,54 @@
package org.apache.flink.cdc.connectors.postgres.source;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/**
* The splitter to split the table into chunks using primary-key (by default) or a given split key.
*/
public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(PostgresChunkSplitter.class);
private final JdbcSourceConfig sourceConfig;
private final PostgresDialect dialect;
@Internal
public class PostgresChunkSplitter extends JdbcSourceChunkSplitter {
public PostgresChunkSplitter(JdbcSourceConfig sourceConfig, PostgresDialect postgresDialect) {
this.sourceConfig = sourceConfig;
this.dialect = postgresDialect;
}
private static String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
private static void maySleep(int count, TableId tableId) {
// every 10 queries to sleep 100ms
if (count % 10 == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
super(sourceConfig, postgresDialect);
}
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
Table table =
Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable();
Column splitColumn = ChunkUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
}
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return PostgresQueryUtils.queryNextChunkMax(
jdbc, tableId, splitColumn, chunkSize, includedLowerBound);
}
/** Postgres chunk split overrides queryMin method to query based on uuid. */
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn)
throws SQLException {
return PostgresQueryUtils.queryMinMax(jdbc, tableId, splitColumn);
}
/** Postgres chunk split overrides queryMin method to query based on uuid. */
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return PostgresQueryUtils.queryMin(jdbc, tableId, column, excludedLowerBound);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
int chunkSize,
Object includedLowerBound)
JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound)
throws SQLException {
return PostgresQueryUtils.queryNextChunkMax(
jdbc, tableId, column, chunkSize, includedLowerBound);
return PostgresQueryUtils.queryMin(jdbc, tableId, splitColumn, excludedLowerBound);
}
// --------------------------------------------------------------------------------------------
@ -156,203 +72,13 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
// --------------------------------------------------------------------------------------------
@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
return PostgresQueryUtils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
public String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return PostgresQueryUtils.buildSplitScanQuery(
tableId, splitKeyType, isFirstSplit, isLastSplit);
}
@Override
public DataType fromDbzColumn(Column splitColumn) {
protected DataType fromDbzColumn(Column splitColumn) {
return PostgresTypeUtils.fromDbzColumn(splitColumn);
}
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;
}
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
// ------------------------------------------------------------------------------------------
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
} else {
return chunkEnd;
}
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}
/** Returns the distribution factor of the given table. */
private double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
}
}

@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@ -146,12 +145,6 @@ public class PostgresQueryUtils {
});
}
public static String buildSplitScanQuery(
TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) {
return buildSplitScanQuery(
tableId, pkRowType, isFirstSplit, isLastSplit, new ArrayList<>());
}
public static String buildSplitScanQuery(
TableId tableId,
RowType pkRowType,
@ -276,7 +269,7 @@ public class PostgresQueryUtils {
return String.format("(%s)::text", value);
}
private static boolean isUUID(Column column) {
public static boolean isUUID(Column column) {
return column.typeName().equals("uuid");
}

@ -17,345 +17,51 @@
package org.apache.flink.cdc.connectors.sqlserver.source.dialect;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
/**
* The {@code ChunkSplitter} used to split SqlServer table into a set of chunks for JDBC data
* source.
*/
public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(SqlServerChunkSplitter.class);
private final JdbcSourceConfig sourceConfig;
private final JdbcDataSourceDialect dialect;
@Internal
public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter {
public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
}
private static String splitId(TableId tableId, int chunkId) {
return tableId.toString() + ":" + chunkId;
}
private static void maySleep(int count, TableId tableId) {
// every 100 queries to sleep 1s
if (count % 10 == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}
@Override
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn =
SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
// convert chunks into splits
List<SnapshotSplit> splits = new ArrayList<>();
RowType splitType = getSplitType(splitColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
SnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
}
long end = System.currentTimeMillis();
LOG.info(
"Split table {} into {} chunks, time cost: {}ms.",
tableId,
splits.size(),
end - start);
return splits;
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Generate Splits for table %s error", tableId), e);
}
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return SqlServerUtils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return SqlServerUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
super(sourceConfig, dialect);
}
@Override
public DataType fromDbzColumn(Column splitColumn) {
protected DataType fromDbzColumn(Column splitColumn) {
return SqlServerTypeUtils.fromDbzColumn(splitColumn);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@Override
public Object queryNextChunkMax(
protected Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Column column,
Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return SqlServerUtils.queryNextChunkMax(
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound);
}
@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
@Override
public String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
}
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;
}
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
// ------------------------------------------------------------------------------------------
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep awhile to avoid DDOS on SqlServer server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;
} else {
return chunkEnd;
}
}
private SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
RowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
Map<TableId, TableChange> schema = new HashMap<>();
schema.put(tableId, dialect.queryTableSchema(jdbc, tableId));
return new SnapshotSplit(
tableId,
splitId(tableId, chunkId),
splitKeyType,
splitStart,
splitEnd,
null,
schema);
}
/** Returns the distribution factor of the given table. */
private double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt
.divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING)
.doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
}

@ -50,7 +50,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@ -59,26 +58,6 @@ public class SqlServerUtils {
public SqlServerUtils() {}
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
quote(columnName), quote(columnName), quote(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]",
minMaxQuery));
}
return rowToArray(rs, 2);
});
}
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@ -103,27 +82,6 @@ public class SqlServerUtils {
});
}
public static Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException {
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
quote(columnName), quote(tableId), quote(columnName));
return jdbc.prepareQueryAndMap(
minQuery,
ps -> ps.setObject(1, excludedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
throw new SQLException(
String.format(
"No result returned after running query [%s]", minQuery));
}
return rs.getObject(1);
});
}
/**
* Returns the next LSN to be read from the database. This is the LSN of the last record that
* was read from the database.

Loading…
Cancel
Save