diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index 4a24aed00..1d50164cc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -18,70 +18,119 @@ package org.apache.flink.cdc.connectors.base.source.assigner.splitter; import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; +import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils; +import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.FlinkRuntimeException; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.math.BigDecimal; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import static java.math.BigDecimal.ROUND_CEILING; +import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ @Experimental -public interface JdbcSourceChunkSplitter extends ChunkSplitter { +public abstract class JdbcSourceChunkSplitter implements ChunkSplitter { + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class); + protected final JdbcSourceConfig sourceConfig; + protected final JdbcDataSourceDialect dialect; + + public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { + this.sourceConfig = sourceConfig; + this.dialect = dialect; + } /** Generates all snapshot splits (chunks) for the give table path. */ @Override - Collection generateSplits(TableId tableId); + public Collection generateSplits(TableId tableId) { + try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - /** - * Query the maximum and minimum value of the column in the table. e.g. query string - * SELECT MIN(%s) FROM %s WHERE %s > ? - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param column column. - * @return maximum and minimum value. - */ - Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException; + LOG.info("Start splitting table {} into chunks...", tableId); + long start = System.currentTimeMillis(); - /** - * Query the minimum value of the column in the table, and the minimum value must greater than - * the excludedLowerBound value. e.g. prepare query string - * SELECT MIN(%s) FROM %s WHERE %s > ? - * - * @param jdbc JDBC connection. - * @param tableId table identity. - * @param column column. - * @param excludedLowerBound the minimum value should be greater than this value. - * @return minimum value. - */ - Object queryMin(JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) - throws SQLException; + Table table = + Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable(); + Column splitColumn = getSplitColumn(table, sourceConfig.getChunkKeyColumn()); + final List chunks; + try { + chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + } catch (SQLException e) { + throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); + } + + // convert chunks into splits + List splits = new ArrayList<>(); + RowType splitType = getSplitType(splitColumn); + for (int i = 0; i < chunks.size(); i++) { + ChunkRange chunk = chunks.get(i); + SnapshotSplit split = + createSnapshotSplit( + jdbc, + tableId, + i, + splitType, + chunk.getChunkStart(), + chunk.getChunkEnd()); + splits.add(split); + } + + long end = System.currentTimeMillis(); + LOG.info( + "Split table {} into {} chunks, time cost: {}ms.", + tableId, + splits.size(), + end - start); + return splits; + } catch (Exception e) { + throw new FlinkRuntimeException( + String.format("Generate Splits for table %s error", tableId), e); + } + } /** * Query the maximum value of the next chunk, and the next chunk must be greater than or equal * to includedLowerBound value [min_1, max_1), [min_2, max_2),... [min_n, null). * Each time this method is called it will return max1, max2... * + *

Each database has different grammar to get limit number of data, for example, `limit N` in + * mysql or postgres, `top(N)` in sqlserver , `FETCH FIRST %S ROWS ONLY` in DB2. + * * @param jdbc JDBC connection. * @param tableId table identity. - * @param column column. + * @param splitColumn column. * @param chunkSize chunk size. * @param includedLowerBound the previous chunk end value. * @return next chunk end value. */ - Object queryNextChunkMax( + protected abstract Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - Column column, + Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException; @@ -89,23 +138,16 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter { /** * Approximate total number of entries in the lookup table. * + *

Each database has different system table to lookup up approximate total number. For + * example, `pg_class` in postgres, `sys.dm_db_partition_stats` in sqlserver, `SYSCAT.TABLE` in + * db2. + * * @param jdbc JDBC connection. * @param tableId table identity. * @return approximate row count. */ - Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException; - - /** - * Build the scan query sql of the {@link SnapshotSplit}. - * - * @param tableId table identity. - * @param splitKeyType primary key type. - * @param isFirstSplit whether the first split. - * @param isLastSplit whether the last split. - * @return query sql. - */ - String buildSplitScanQuery( - TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit); + protected abstract Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) + throws SQLException; /** * Checks whether split column is evenly distributed across its range. @@ -113,7 +155,7 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter { * @param splitColumn split column. * @return true that means split column with type BIGINT, INT, DECIMAL. */ - default boolean isEvenlySplitColumn(Column splitColumn) { + protected boolean isEvenlySplitColumn(Column splitColumn) { DataType flinkType = fromDbzColumn(splitColumn); LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); @@ -130,7 +172,94 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter { * @param splitColumn dbz split column. * @return flink data type */ - DataType fromDbzColumn(Column splitColumn); + protected abstract DataType fromDbzColumn(Column splitColumn); + + /** Returns the distribution factor of the given table. */ + protected double calculateDistributionFactor( + TableId tableId, Object min, Object max, long approximateRowCnt) { + + if (!min.getClass().equals(max.getClass())) { + throw new IllegalStateException( + String.format( + "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", + min.getClass().getSimpleName(), max.getClass().getSimpleName())); + } + if (approximateRowCnt == 0) { + return Double.MAX_VALUE; + } + BigDecimal difference = ObjectUtils.minus(max, min); + // factor = (max - min + 1) / rowCount + final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); + double distributionFactor = + subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); + LOG.info( + "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", + tableId, + distributionFactor, + min, + max, + approximateRowCnt); + return distributionFactor; + } + + /** + * Get the column which is seen as chunk key. + * + * @param table table identity. + * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use + * primary key instead. @Column the column which is seen as chunk key. + */ + protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { + return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn); + } + + /** ChunkEnd less than or equal to max. */ + protected boolean isChunkEndLeMax(Object chunkEnd, Object max) { + return ObjectUtils.compare(chunkEnd, max) <= 0; + } + + /** ChunkEnd greater than or equal to max. */ + protected boolean isChunkEndGeMax(Object chunkEnd, Object max) { + return ObjectUtils.compare(chunkEnd, max) >= 0; + } + + /** + * Query the maximum and minimum value of the column in the table. e.g. query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @return maximum and minimum value. + */ + protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) + throws SQLException { + return JdbcChunkUtils.queryMinMax( + jdbc, + jdbc.quotedTableIdString(tableId), + jdbc.quotedColumnIdString(splitColumn.name())); + } + + /** + * Query the minimum value of the column in the table, and the minimum value must greater than + * the excludedLowerBound value. e.g. prepare query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param tableId table identity. + * @param splitColumn column. + * @param excludedLowerBound the minimum value should be greater than this value. + * @return minimum value. + */ + protected Object queryMin( + JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) + throws SQLException { + return JdbcChunkUtils.queryMin( + jdbc, + jdbc.quotedColumnIdString(splitColumn.name()), + jdbc.quotedTableIdString(tableId), + excludedLowerBound); + } /** * convert dbz column to Flink row type. @@ -138,8 +267,178 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter { * @param splitColumn split column. * @return flink row type. */ - default RowType getSplitType(Column splitColumn) { + private RowType getSplitType(Column splitColumn) { return (RowType) ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType(); } + + /** + * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using + * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request + * many queries and is not efficient. + */ + private List splitTableIntoChunks( + JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); + final Object min = minMax[0]; + final Object max = minMax[1]; + if (min == null || max == null || min.equals(max)) { + // empty table, or only one row, return full table scan as a chunk + return Collections.singletonList(ChunkRange.all()); + } + + final int chunkSize = sourceConfig.getSplitSize(); + final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); + final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + + if (isEvenlySplitColumn(splitColumn)) { + long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); + double distributionFactor = + calculateDistributionFactor(tableId, min, max, approximateRowCnt); + + boolean dataIsEvenlyDistributed = + doubleCompare(distributionFactor, distributionFactorLower) >= 0 + && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; + + if (dataIsEvenlyDistributed) { + // the minimum dynamic chunk size is at least 1 + final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); + return splitEvenlySizedChunks( + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); + } + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); + } + } + + /** + * Split table into evenly sized chunks based on the numeric min and max value of split column, + * and tumble chunks in step size. + */ + private List splitEvenlySizedChunks( + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { + LOG.info( + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", + tableId, + approximateRowCnt, + chunkSize, + dynamicChunkSize); + if (approximateRowCnt <= chunkSize) { + // there is no more than one chunk, return full table as a chunk + return Collections.singletonList(ChunkRange.all()); + } + + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); + while (ObjectUtils.compare(chunkEnd, max) <= 0) { + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + chunkStart = chunkEnd; + try { + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); + } catch (ArithmeticException e) { + // Stop chunk split to avoid dead loop when number overflows. + break; + } + } + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ + private List splitUnevenlySizedChunks( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + Object min, + Object max, + int chunkSize) + throws SQLException { + LOG.info( + "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); + int count = 0; + while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) { + // we start from [null, min + chunk_size) and avoid [null, min) + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + // may sleep a while to avoid DDOS on PostgreSQL server + maySleep(count++, tableId); + chunkStart = chunkEnd; + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); + } + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + private Object nextChunkEnd( + JdbcConnection jdbc, + Object previousChunkEnd, + TableId tableId, + Column splitColumn, + Object max, + int chunkSize) + throws SQLException { + // chunk end might be null when max values are removed + Object chunkEnd = + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); + if (Objects.equals(previousChunkEnd, chunkEnd)) { + // we don't allow equal chunk start and end, + // should query the next one larger than chunkEnd + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); + } + if (isChunkEndGeMax(chunkEnd, max)) { + return null; + } else { + return chunkEnd; + } + } + + private SnapshotSplit createSnapshotSplit( + JdbcConnection jdbc, + TableId tableId, + int chunkId, + RowType splitKeyType, + Object chunkStart, + Object chunkEnd) { + // currently, we only support single split column + Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; + Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; + Map schema = new HashMap<>(); + schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); + return new SnapshotSplit( + tableId, + splitId(tableId, chunkId), + splitKeyType, + splitStart, + splitEnd, + null, + schema); + } + + private String splitId(TableId tableId, int chunkId) { + return tableId.toString() + ":" + chunkId; + } + + private void maySleep(int count, TableId tableId) { + // every 10 queries to sleep 0.1s + if (count % 10 == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // nothing to do + } + LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); + } + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java new file mode 100644 index 000000000..46b30310c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.utils; + +import org.apache.flink.table.api.ValidationException; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.Table; + +import javax.annotation.Nullable; + +import java.sql.SQLException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; + +/** Utilities to split chunks of table. */ +public class JdbcChunkUtils { + + /** + * Query the maximum and minimum value of the column in the table. e.g. query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param quotedTableName table identity. + * @param quotedColumnName column name. + * @return maximum and minimum value. + */ + public static Object[] queryMinMax( + JdbcConnection jdbc, String quotedTableName, String quotedColumnName) + throws SQLException { + final String minMaxQuery = + String.format( + "SELECT MIN(%s), MAX(%s) FROM %s", + quotedColumnName, quotedColumnName, quotedTableName); + return jdbc.queryAndMap( + minMaxQuery, + rs -> { + if (!rs.next()) { + // this should never happen + throw new SQLException( + String.format( + "No result returned after running query [%s]", + minMaxQuery)); + } + return rowToArray(rs, 2); + }); + } + + /** + * Query the minimum value of the column in the table, and the minimum value must greater than + * the excludedLowerBound value. e.g. prepare query string + * SELECT MIN(%s) FROM %s WHERE %s > ? + * + * @param jdbc JDBC connection. + * @param quotedTableName table identity. + * @param quotedColumnName column name. + * @param excludedLowerBound the minimum value should be greater than this value. + * @return minimum value. + */ + public static Object queryMin( + JdbcConnection jdbc, + String quotedTableName, + String quotedColumnName, + Object excludedLowerBound) + throws SQLException { + final String minQuery = + String.format( + "SELECT MIN(%s) FROM %s WHERE %s > ?", + quotedColumnName, quotedTableName, quotedColumnName); + return jdbc.prepareQueryAndMap( + minQuery, + ps -> ps.setObject(1, excludedLowerBound), + rs -> { + if (!rs.next()) { + // this should never happen + throw new SQLException( + String.format( + "No result returned after running query [%s]", minQuery)); + } + return rs.getObject(1); + }); + } + + /** + * Get the column which is seen as chunk key. + * + * @param table table identity. + * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use + * primary key instead. @Column the column which is seen as chunk key. + */ + public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { + List primaryKeys = table.primaryKeyColumns(); + if (primaryKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Incremental snapshot for tables requires primary key," + + " but table %s doesn't have primary key.", + table.id())); + } + + if (chunkKeyColumn != null) { + Optional targetPkColumn = + primaryKeys.stream() + .filter(col -> chunkKeyColumn.equals(col.name())) + .findFirst(); + if (targetPkColumn.isPresent()) { + return targetPkColumn.get(); + } + throw new ValidationException( + String.format( + "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.", + chunkKeyColumn, + primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")), + table.id())); + } + + // use first field in primary key as the split key + return primaryKeys.get(0); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java index 29db714eb..497e8e657 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/MySqlChunkSplitter.java @@ -17,353 +17,48 @@ package org.apache.flink.cdc.connectors.base.experimental; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlTypeUtils; import org.apache.flink.cdc.connectors.base.experimental.utils.MySqlUtils; -import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange; import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; -import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; -import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.FlinkRuntimeException; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; -import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges.TableChange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static java.math.BigDecimal.ROUND_CEILING; -import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ -public class MySqlChunkSplitter implements JdbcSourceChunkSplitter { - - private static final Logger LOG = LoggerFactory.getLogger(MySqlChunkSplitter.class); - - private final JdbcSourceConfig sourceConfig; - private final JdbcDataSourceDialect dialect; +@Internal +public class MySqlChunkSplitter extends JdbcSourceChunkSplitter { public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; - } - - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - Column splitColumn = getSplitColumn(table); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } - } - - @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) - throws SQLException { - return MySqlUtils.queryMinMax(jdbc, tableId, column.name()); - } - - @Override - public Object queryMin( - JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) - throws SQLException { - return MySqlUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); + super(sourceConfig, dialect); } @Override public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - Column column, + Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException { return MySqlUtils.queryNextChunkMax( - jdbc, tableId, column.name(), chunkSize, includedLowerBound); + jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound); } @Override - public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) + throws SQLException { return MySqlUtils.queryApproximateRowCnt(jdbc, tableId); } @Override - public String buildSplitScanQuery( - TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { - return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); - } - - @Override - public DataType fromDbzColumn(Column splitColumn) { + protected DataType fromDbzColumn(Column splitColumn) { return MySqlTypeUtils.fromDbzColumn(splitColumn); } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) - throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on MySQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); - } - - // ------------------------------------------------------------------------------------------ - /** Returns the distribution factor of the given table. */ - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - private static void maySleep(int count, TableId tableId) { - // every 10 queries to sleep 100ms - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } - } - - public static Column getSplitColumn(Table table) { - List primaryKeys = table.primaryKeyColumns(); - if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); - } - - // use first field in primary key as the split key - return primaryKeys.get(0); - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java index 0d6ef1f88..f85a283a7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/utils/MySqlUtils.java @@ -45,7 +45,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; @@ -54,26 +53,6 @@ public class MySqlUtils { private MySqlUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) - throws SQLException { - final String minMaxQuery = - String.format( - "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); - return jdbc.queryAndMap( - minMaxQuery, - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", - minMaxQuery)); - } - return rowToArray(rs, 2); - }); - } - public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { // The statement used to get approximate row count which is less @@ -94,27 +73,6 @@ public class MySqlUtils { }); } - public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) - throws SQLException { - final String minQuery = - String.format( - "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); - return jdbc.prepareQueryAndMap( - minQuery, - ps -> ps.setObject(1, excludedLowerBound), - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", minQuery)); - } - return rs.getObject(1); - }); - } - public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, @@ -122,7 +80,7 @@ public class MySqlUtils { int chunkSize, Object includedLowerBound) throws SQLException { - String quotedColumn = quote(splitColumnName); + String quotedColumn = jdbc.quotedColumnIdString(splitColumnName); String query = String.format( "SELECT MAX(%s) FROM (" @@ -130,7 +88,7 @@ public class MySqlUtils { + ") AS T", quotedColumn, quotedColumn, - quote(tableId), + jdbc.quotedTableIdString(tableId), quotedColumn, quotedColumn, chunkSize); @@ -313,20 +271,6 @@ public class MySqlUtils { .getLogicalType(); } - public static Column getSplitColumn(Table table) { - List primaryKeys = table.primaryKeyColumns(); - if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); - } - - // use first field in primary key as the split key - return primaryKeys.get(0); - } - public static String quote(String dbOrTableName) { return "`" + dbOrTableName + "`"; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java index 6381cd74a..6e77de798 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/dialect/Db2ChunkSplitter.java @@ -17,139 +17,37 @@ package org.apache.flink.cdc.connectors.db2.source.dialect; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; -import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange; import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; -import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; -import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; import org.apache.flink.cdc.connectors.db2.source.utils.Db2TypeUtils; import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.FlinkRuntimeException; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; -import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges.TableChange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.math.RoundingMode; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; /** * The splitter to split the table into chunks using primary-key (by default) or a given split key. */ -public class Db2ChunkSplitter implements JdbcSourceChunkSplitter { - - private static final Logger LOG = LoggerFactory.getLogger(Db2ChunkSplitter.class); - - private final JdbcSourceConfig sourceConfig; - private final JdbcDataSourceDialect dialect; +@Internal +public class Db2ChunkSplitter extends JdbcSourceChunkSplitter { public Db2ChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - private static void maySleep(int count, TableId tableId) { - // every 10 queries to sleep 0.1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } - } - - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - Column splitColumn = Db2Utils.getSplitColumn(table, sourceConfig.getChunkKeyColumn()); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + super(sourceConfig, dialect); } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) - throws SQLException { - return Db2Utils.queryMinMax(jdbc, tableId, column.name()); - } - - @Override - public Object queryMin( - JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) - throws SQLException { - return Db2Utils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); - } - - @Override - public DataType fromDbzColumn(Column splitColumn) { + protected DataType fromDbzColumn(Column splitColumn) { return Db2TypeUtils.fromDbzColumn(splitColumn); } - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - @Override - public Object queryNextChunkMax( + protected Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, Column column, @@ -161,199 +59,8 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter { } @Override - public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - return Db2Utils.queryApproximateRowCnt(jdbc, tableId); - } - - @Override - public String buildSplitScanQuery( - TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { - return Db2Utils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); - } - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - // ------------------------------------------------------------------------------------------ - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep awhile to avoid DDOS on Db2 server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); - } - - /** Returns the distribution factor of the given table. */ - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt - .divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING) - .doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; + return Db2Utils.queryApproximateRowCnt(jdbc, tableId); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java index 06c4e741f..2d08d7bff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2Utils.java @@ -49,7 +49,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; @@ -60,26 +59,6 @@ public class Db2Utils { public Db2Utils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) - throws SQLException { - final String minMaxQuery = - String.format( - "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quoteSchemaAndTable(tableId)); - return jdbc.queryAndMap( - minMaxQuery, - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", - minMaxQuery)); - } - return rowToArray(rs, 2); - }); - } - public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { // The statement used to get approximate row count which is less @@ -103,27 +82,6 @@ public class Db2Utils { }); } - public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) - throws SQLException { - final String minQuery = - String.format( - "SELECT MIN(%s) FROM %s WHERE %s > ?", - columnName, quote(tableId), columnName); - return jdbc.prepareQueryAndMap( - minQuery, - ps -> ps.setObject(1, excludedLowerBound), - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", minQuery)); - } - return rs.getObject(1); - }); - } - /** * Returns the next LSN to be read from the database. This is the LSN of the last record that * was read from the database. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java index d68f1cd99..adf4c98a9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java @@ -17,253 +17,92 @@ package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; -import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange; import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; -import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; +import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils; import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; import org.apache.flink.cdc.connectors.oracle.source.utils.OracleTypeUtils; import org.apache.flink.cdc.connectors.oracle.source.utils.OracleUtils; import org.apache.flink.cdc.connectors.oracle.util.ChunkUtils; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.FlinkRuntimeException; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges.TableChange; import oracle.sql.ROWID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import javax.annotation.Nullable; -import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; +import java.sql.SQLException; /** * The {@code ChunkSplitter} used to split Oracle table into a set of chunks for JDBC data source. */ -public class OracleChunkSplitter implements JdbcSourceChunkSplitter { - - private static final Logger LOG = LoggerFactory.getLogger(OracleChunkSplitter.class); - - private final JdbcSourceConfig sourceConfig; - private final JdbcDataSourceDialect dialect; +@Internal +public class OracleChunkSplitter extends JdbcSourceChunkSplitter { public OracleChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; - } - - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - Column splitColumn = - ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn()); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + super(sourceConfig, dialect); } @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - return OracleUtils.queryMinMax(jdbc, tableId, column.name()); + // oracle query only use schema and table. + String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId); + return JdbcChunkUtils.queryMinMax( + jdbc, quoteSchemaAndTable, jdbc.quotedColumnIdString(splitColumn.name())); } @Override public Object queryMin( JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) throws SQLException { - return OracleUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); + // oracle query only use schema and table. + String quoteSchemaAndTable = OracleUtils.quoteSchemaAndTable(tableId); + return JdbcChunkUtils.queryMin( + jdbc, quoteSchemaAndTable, jdbc.quotedTableIdString(tableId), excludedLowerBound); } @Override public Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - Column column, + Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException { return OracleUtils.queryNextChunkMax( - jdbc, tableId, column.name(), chunkSize, includedLowerBound); + jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound); } @Override - public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) + throws SQLException { return OracleUtils.queryApproximateRowCnt(jdbc, tableId); } - @Override - public String buildSplitScanQuery( - TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { - return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); - } - @Override public DataType fromDbzColumn(Column splitColumn) { return OracleTypeUtils.fromDbzColumn(splitColumn); } - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - + @Override + protected boolean isEvenlySplitColumn(Column splitColumn) { // use ROWID get splitUnevenlySizedChunks by default if (splitColumn.name().equals(ROWID.class.getSimpleName())) { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}", - tableId, - approximateRowCnt, - chunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); + return false; } - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, chunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) - throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - - while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on MySQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; + return super.isEvenlySplitColumn(splitColumn); } /** ChunkEnd less than or equal to max. */ - private boolean isChunkEndLeMax(Object chunkEnd, Object max) { + @Override + protected boolean isChunkEndLeMax(Object chunkEnd, Object max) { boolean chunkEndMaxCompare; if (chunkEnd instanceof ROWID && max instanceof ROWID) { chunkEndMaxCompare = @@ -276,7 +115,8 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter { } /** ChunkEnd greater than or equal to max. */ - private boolean isChunkEndGeMax(Object chunkEnd, Object max) { + @Override + protected boolean isChunkEndGeMax(Object chunkEnd, Object max) { boolean chunkEndMaxCompare; if (chunkEnd instanceof ROWID && max instanceof ROWID) { chunkEndMaxCompare = @@ -288,95 +128,9 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter { return chunkEndMaxCompare; } - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (isChunkEndGeMax(chunkEnd, max)) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); - } - - // ------------------------------------------------------------------------------------------ - /** Returns the distribution factor of the given table. */ - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt - .divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING) - .doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - private static void maySleep(int count, TableId tableId) { - // every 10 queries to sleep 0.1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } + @Override + protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { + // Use the ROWID column as the chunk key column by default for oracle cdc connector + return ChunkUtils.getChunkKeyColumn(table, chunkKeyColumn); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java index b762a974b..4f3508744 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleUtils.java @@ -42,33 +42,11 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; - /** Utils to prepare Oracle SQL statement. */ public class OracleUtils { private OracleUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) - throws SQLException { - final String minMaxQuery = - String.format( - "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quoteSchemaAndTable(tableId)); - return jdbc.queryAndMap( - minMaxQuery, - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", - minMaxQuery)); - } - return rowToArray(rs, 2); - }); - } - public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { final String analyzeTable = @@ -92,27 +70,6 @@ public class OracleUtils { }); } - public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) - throws SQLException { - final String minQuery = - String.format( - "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quoteSchemaAndTable(tableId), quote(columnName)); - return jdbc.prepareQueryAndMap( - minQuery, - ps -> ps.setObject(1, excludedLowerBound), - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", minQuery)); - } - return rs.getObject(1); - }); - } - public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java index 96a4cc18d..9639b28f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresChunkSplitter.java @@ -17,138 +17,54 @@ package org.apache.flink.cdc.connectors.postgres.source; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; -import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange; import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; -import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; -import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; -import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils; import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils; import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.FlinkRuntimeException; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; -import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges.TableChange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static java.math.BigDecimal.ROUND_CEILING; -import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; /** * The splitter to split the table into chunks using primary-key (by default) or a given split key. */ -public class PostgresChunkSplitter implements JdbcSourceChunkSplitter { - private static final Logger LOG = LoggerFactory.getLogger(PostgresChunkSplitter.class); - - private final JdbcSourceConfig sourceConfig; - private final PostgresDialect dialect; +@Internal +public class PostgresChunkSplitter extends JdbcSourceChunkSplitter { public PostgresChunkSplitter(JdbcSourceConfig sourceConfig, PostgresDialect postgresDialect) { - this.sourceConfig = sourceConfig; - this.dialect = postgresDialect; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - private static void maySleep(int count, TableId tableId) { - // every 10 queries to sleep 100ms - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } + super(sourceConfig, postgresDialect); } @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = - Objects.requireNonNull(dialect.queryTableSchema(jdbc, tableId)).getTable(); - Column splitColumn = ChunkUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn()); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + public Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + Column splitColumn, + int chunkSize, + Object includedLowerBound) + throws SQLException { + return PostgresQueryUtils.queryNextChunkMax( + jdbc, tableId, splitColumn, chunkSize, includedLowerBound); } + /** Postgres chunk split overrides queryMin method to query based on uuid. */ @Override public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { return PostgresQueryUtils.queryMinMax(jdbc, tableId, splitColumn); } + /** Postgres chunk split overrides queryMin method to query based on uuid. */ @Override public Object queryMin( - JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) - throws SQLException { - return PostgresQueryUtils.queryMin(jdbc, tableId, column, excludedLowerBound); - } - - @Override - public Object queryNextChunkMax( - JdbcConnection jdbc, - TableId tableId, - Column column, - int chunkSize, - Object includedLowerBound) + JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) throws SQLException { - return PostgresQueryUtils.queryNextChunkMax( - jdbc, tableId, column, chunkSize, includedLowerBound); + return PostgresQueryUtils.queryMin(jdbc, tableId, splitColumn, excludedLowerBound); } // -------------------------------------------------------------------------------------------- @@ -156,203 +72,13 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter { // -------------------------------------------------------------------------------------------- @Override - public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) + throws SQLException { return PostgresQueryUtils.queryApproximateRowCnt(jdbc, tableId); } @Override - public String buildSplitScanQuery( - TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { - return PostgresQueryUtils.buildSplitScanQuery( - tableId, splitKeyType, isFirstSplit, isLastSplit); - } - - @Override - public DataType fromDbzColumn(Column splitColumn) { + protected DataType fromDbzColumn(Column splitColumn) { return PostgresTypeUtils.fromDbzColumn(splitColumn); } - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - // ------------------------------------------------------------------------------------------ - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) - throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on PostgreSQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); - } - - /** Returns the distribution factor of the given table. */ - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java index 3e8a1feb6..e7408be34 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -146,12 +145,6 @@ public class PostgresQueryUtils { }); } - public static String buildSplitScanQuery( - TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitScanQuery( - tableId, pkRowType, isFirstSplit, isLastSplit, new ArrayList<>()); - } - public static String buildSplitScanQuery( TableId tableId, RowType pkRowType, @@ -276,7 +269,7 @@ public class PostgresQueryUtils { return String.format("(%s)::text", value); } - private static boolean isUUID(Column column) { + public static boolean isUUID(Column column) { return column.typeName().equals("uuid"); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java index 395467175..fb338a0ea 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java @@ -17,345 +17,51 @@ package org.apache.flink.cdc.connectors.sqlserver.source.dialect; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; -import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange; import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; -import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; -import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils; import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerUtils; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.FlinkRuntimeException; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; -import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges.TableChange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.math.RoundingMode; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; /** * The {@code ChunkSplitter} used to split SqlServer table into a set of chunks for JDBC data * source. */ -public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter { - - private static final Logger LOG = LoggerFactory.getLogger(SqlServerChunkSplitter.class); - - private final JdbcSourceConfig sourceConfig; - private final JdbcDataSourceDialect dialect; +@Internal +public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter { public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - private static void maySleep(int count, TableId tableId) { - // every 100 queries to sleep 1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } - } - - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - Column splitColumn = - SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn()); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - RowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new FlinkRuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } - } - - @Override - public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) - throws SQLException { - return SqlServerUtils.queryMinMax(jdbc, tableId, column.name()); - } - - @Override - public Object queryMin( - JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) - throws SQLException { - return SqlServerUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound); + super(sourceConfig, dialect); } @Override - public DataType fromDbzColumn(Column splitColumn) { + protected DataType fromDbzColumn(Column splitColumn) { return SqlServerTypeUtils.fromDbzColumn(splitColumn); } - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - @Override - public Object queryNextChunkMax( + protected Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - Column column, + Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException { return SqlServerUtils.queryNextChunkMax( - jdbc, tableId, column.name(), chunkSize, includedLowerBound); + jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound); } @Override - public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId); - } - - @Override - public String buildSplitScanQuery( - TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { - return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); - } - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); - } - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - // ------------------------------------------------------------------------------------------ - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) + protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); - int count = 0; - while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep awhile to avoid DDOS on SqlServer server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - Column splitColumn, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); - } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - RowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - Map schema = new HashMap<>(); - schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); - return new SnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); - } - - /** Returns the distribution factor of the given table. */ - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt - .divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING) - .doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; + return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java index 711f42f8a..b2292a825 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java @@ -50,7 +50,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; @@ -59,26 +58,6 @@ public class SqlServerUtils { public SqlServerUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) - throws SQLException { - final String minMaxQuery = - String.format( - "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); - return jdbc.queryAndMap( - minMaxQuery, - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", - minMaxQuery)); - } - return rowToArray(rs, 2); - }); - } - public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { // The statement used to get approximate row count which is less @@ -103,27 +82,6 @@ public class SqlServerUtils { }); } - public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) - throws SQLException { - final String minQuery = - String.format( - "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); - return jdbc.prepareQueryAndMap( - minQuery, - ps -> ps.setObject(1, excludedLowerBound), - rs -> { - if (!rs.next()) { - // this should never happen - throw new SQLException( - String.format( - "No result returned after running query [%s]", minQuery)); - } - return rs.getObject(1); - }); - } - /** * Returns the next LSN to be read from the database. This is the LSN of the last record that * was read from the database.