From 4b15614c458b0b50831b026d1680869654cf7e4e Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Fri, 5 Nov 2021 15:03:43 +0800 Subject: [PATCH] [mysql] Support dynamic chunk size for evenly distribution table --- .../mysql/source/MySqlSourceBuilder.java | 6 + .../mysql/source/assigners/ChunkSplitter.java | 116 ++++++------- .../assigners/MySqlHybridSplitAssigner.java | 1 - .../assigners/MySqlSnapshotSplitAssigner.java | 9 +- .../source/config/MySqlSourceConfig.java | 7 + .../config/MySqlSourceConfigFactory.java | 18 ++- .../source/config/MySqlSourceOptions.java | 30 +++- .../mysql/table/MySqlTableSource.java | 7 + .../mysql/table/MySqlTableSourceFactory.java | 14 ++ .../MySqlSnapshotSplitAssignerTest.java | 152 +++++++++++------- .../table/MySqlTableSourceFactoryTest.java | 25 +++ 11 files changed, 259 insertions(+), 126 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 57986141d..ea69c8617 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -139,6 +139,12 @@ public class MySqlSourceBuilder { return this; } + /** The factor is used to determine whether the table is evenly distribution or not. */ + public MySqlSourceBuilder evenlyDistributionFactor(Double evenlyDistributionFactor) { + this.configFactory.evenlyDistributionFactor(evenlyDistributionFactor); + return this; + } + /** The maximum fetch size for per poll when read table snapshot. */ public MySqlSourceBuilder fetchSize(int fetchSize) { this.configFactory.fetchSize(fetchSize); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index 7b22c5078..8c58fcfb8 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -62,25 +62,19 @@ import static java.math.BigDecimal.ROUND_CEILING; class ChunkSplitter { private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class); - /** - * The maximum evenly distribution factor used to judge the data in table is evenly distributed - * or not, the factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount. - */ - public static final Double MAX_EVENLY_DISTRIBUTION_FACTOR = 2.0d; - private final MySqlSourceConfig sourceConfig; private final MySqlSchema mySqlSchema; - private final int chunkSize; - public ChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig, int chunkSize) { + public ChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig) { this.mySqlSchema = mySqlSchema; - this.chunkSize = chunkSize; this.sourceConfig = sourceConfig; } /** Generates all snapshot splits (chunks) for the give table path. */ public Collection generateSplits(TableId tableId) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { + + LOG.info("Start splitting table {} into chunks...", tableId); long start = System.currentTimeMillis(); Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable(); @@ -137,12 +131,38 @@ class ChunkSplitter { } final List chunks; - if (isSplitColumnEvenlyDistributed(jdbc, tableId, splitColumn, min, max, chunkSize)) { + final int chunkSize = sourceConfig.getSplitSize(); + final double evenlyDistributionFactor = sourceConfig.getEvenlyDistributionFactor(); + + boolean isSplitColumnEvenlyDistributed = false; + double distributionFactor = 0.0d; + if (isEvenlySplitColumn(splitColumn)) { + // optimization: table with single chunk, avoid querying from db + if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) { + isSplitColumnEvenlyDistributed = true; + } else { + distributionFactor = calculateDistributionFactor(jdbc, tableId, min, max); + isSplitColumnEvenlyDistributed = distributionFactor <= evenlyDistributionFactor; + } + } + + if (isSplitColumnEvenlyDistributed) { // use evenly-sized chunks which is much efficient - chunks = splitEvenlySizedChunks(min, max); + final int dynamicChunkSize = + getDynamicChunkSize(chunkSize, distributionFactor, evenlyDistributionFactor); + LOG.info( + "Use evenly-sized chunk optimization for table {}, the distribution factor is {}, the chunk size is {}", + tableId, + distributionFactor, + dynamicChunkSize); + chunks = splitEvenlySizedChunks(min, max, dynamicChunkSize); } else { // use unevenly-sized chunks which will request many queries and is not efficient. - chunks = splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max); + LOG.info( + "Use unevenly-sized chunks for table{}, the chunk size is {}", + tableId, + chunkSize); + chunks = splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); } return chunks; @@ -150,9 +170,9 @@ class ChunkSplitter { /** * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in {@link #chunkSize} step size. + * and tumble chunks in step size. */ - private List splitEvenlySizedChunks(Object min, Object max) { + private List splitEvenlySizedChunks(Object min, Object max, int chunkSize) { if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) { // there is no more than one chunk, return full table as a chunk return Collections.singletonList(ChunkRange.all()); @@ -173,19 +193,24 @@ class ChunkSplitter { /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ private List splitUnevenlySizedChunks( - JdbcConnection jdbc, TableId tableId, String splitColumnName, Object min, Object max) + JdbcConnection jdbc, + TableId tableId, + String splitColumnName, + Object min, + Object max, + int chunkSize) throws SQLException { final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); int count = 0; while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) splits.add(ChunkRange.of(chunkStart, chunkEnd)); // may sleep a while to avoid DDOS on MySQL server - maySleep(count++); + maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -197,7 +222,8 @@ class ChunkSplitter { Object previousChunkEnd, TableId tableId, String splitColumnName, - Object max) + Object max, + int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = @@ -239,50 +265,31 @@ class ChunkSplitter { // ------------------------------------------------------------------------------------------ /** Checks whether split column is evenly distributed across its range. */ - private static boolean isSplitColumnEvenlyDistributed( - JdbcConnection jdbc, - TableId tableId, - Column splitColumn, - Object min, - Object max, - int chunkSize) - throws SQLException { + private static boolean isEvenlySplitColumn(Column splitColumn) { DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); // currently, we only support the optimization that split column with type BIGINT, INT, // DECIMAL - if (!(typeRoot == LogicalTypeRoot.BIGINT + return typeRoot == LogicalTypeRoot.BIGINT || typeRoot == LogicalTypeRoot.INTEGER - || typeRoot == LogicalTypeRoot.DECIMAL)) { - return false; - } + || typeRoot == LogicalTypeRoot.DECIMAL; + } - if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) { - return true; + /** Returns the dynamic chunk size of evenly distributed table. */ + private static int getDynamicChunkSize( + int chunkSize, double distributionFactor, double evenlyDistributionFactor) { + int dynamicChunkSize = chunkSize; + if (distributionFactor <= evenlyDistributionFactor) { + dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), dynamicChunkSize); } - - // only column is numeric and evenly distribution factor is less than - // MAX_EVENLY_DISTRIBUTION_FACTOR will be treated as evenly distributed. - final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - final double evenlyDistributionFactor = - calculateEvenlyDistributionFactor(min, max, approximateRowCnt); - LOG.info( - "The evenly distribution factor for table {} is {}", - tableId, - evenlyDistributionFactor); - return evenlyDistributionFactor <= MAX_EVENLY_DISTRIBUTION_FACTOR; + return dynamicChunkSize; } - /** - * Returns the evenly distribution factor of the table data. - * - * @param min the min value of the split column - * @param max the max value of the split column - * @param approximateRowCnt the approximate row count of the table. - */ - private static double calculateEvenlyDistributionFactor( - Object min, Object max, long approximateRowCnt) { + /** Returns the distribution factor of the given table. */ + private static double calculateDistributionFactor( + JdbcConnection jdbc, TableId tableId, Object min, Object max) throws SQLException { + final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); if (!min.getClass().equals(max.getClass())) { throw new IllegalStateException( String.format( @@ -302,7 +309,7 @@ class ChunkSplitter { return tableId.toString() + ":" + chunkId; } - private static void maySleep(int count) { + private static void maySleep(int count, TableId tableId) { // every 100 queries to sleep 1s if (count % 10 == 0) { try { @@ -310,6 +317,7 @@ class ChunkSplitter { } catch (InterruptedException e) { // nothing to do } + LOG.info("ChunkSplitter has split {} chunks for table {}", count, tableId); } } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index 862f64024..e5a71a35d 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -88,7 +88,6 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { @Override public void open() { - LOG.info("Open assigner"); snapshotSplitAssigner.open(); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 14c7f5a43..e3d7b93fb 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -69,7 +69,6 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private final int currentParallelism; private final LinkedList remainingTables; private final boolean isRemainingTablesCheckpointed; - private final int chunkSize; private ChunkSplitter chunkSplitter; private boolean isTableIdCaseSensitive; @@ -130,15 +129,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { this.splitFinishedOffsets = splitFinishedOffsets; this.assignerFinished = assignerFinished; this.remainingTables = new LinkedList<>(remainingTables); - this.chunkSize = sourceConfig.getSplitSize(); this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.isTableIdCaseSensitive = isTableIdCaseSensitive; } @Override public void open() { - LOG.info("Open assigner"); - chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSize); + chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive); // the legacy state didn't snapshot remaining tables, discovery remaining table here if (!isRemainingTablesCheckpointed && !assignerFinished) { @@ -303,8 +300,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { } private static ChunkSplitter createChunkSplitter( - MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive, int chunkSize) { + MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) { MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseSensitive); - return new ChunkSplitter(mySqlSchema, sourceConfig, chunkSize); + return new ChunkSplitter(mySqlSchema, sourceConfig); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 126ed8a0f..7530442f2 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -52,6 +52,7 @@ public class MySqlSourceConfig implements Serializable { private final Duration connectTimeout; private final int connectMaxRetries; private final int connectionPoolSize; + private final double evenlyDistributionFactor; private final boolean includeSchemaChanges; // -------------------------------------------------------------------------------------------- @@ -77,6 +78,7 @@ public class MySqlSourceConfig implements Serializable { Duration connectTimeout, int connectMaxRetries, int connectionPoolSize, + double evenlyDistributionFactor, boolean includeSchemaChanges, Properties dbzProperties) { this.hostname = checkNotNull(hostname); @@ -94,6 +96,7 @@ public class MySqlSourceConfig implements Serializable { this.connectTimeout = checkNotNull(connectTimeout); this.connectMaxRetries = connectMaxRetries; this.connectionPoolSize = connectionPoolSize; + this.evenlyDistributionFactor = evenlyDistributionFactor; this.includeSchemaChanges = includeSchemaChanges; this.dbzProperties = checkNotNull(dbzProperties); this.dbzConfiguration = Configuration.from(dbzProperties); @@ -141,6 +144,10 @@ public class MySqlSourceConfig implements Serializable { return splitMetaGroupSize; } + public double getEvenlyDistributionFactor() { + return evenlyDistributionFactor; + } + public int getFetchSize() { return fetchSize; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 0d6619cd1..adeda5919 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -32,6 +32,10 @@ import java.util.Properties; import java.util.UUID; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE; @@ -55,9 +59,10 @@ public class MySqlSourceConfigFactory implements Serializable { private int splitSize = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(); private int splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue(); private int fetchSize = SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(); - private Duration connectTimeout = MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue(); - private int connectMaxRetries = MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue(); - private int connectionPoolSize = MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue(); + private Duration connectTimeout = CONNECT_TIMEOUT.defaultValue(); + private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue(); + private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue(); + private double evenlyDistributionFactor = EVENLY_DISTRIBUTION_FACTOR.defaultValue(); private boolean includeSchemaChanges = false; private Properties dbzProperties; @@ -147,6 +152,12 @@ public class MySqlSourceConfigFactory implements Serializable { return this; } + /** The factor is used to determine whether the table is evenly distribution or not. */ + public MySqlSourceConfigFactory evenlyDistributionFactor(Double evenlyDistributionFactor) { + this.evenlyDistributionFactor = evenlyDistributionFactor; + return this; + } + /** The maximum fetch size for per poll when read table snapshot. */ public MySqlSourceConfigFactory fetchSize(int fetchSize) { this.fetchSize = fetchSize; @@ -273,6 +284,7 @@ public class MySqlSourceConfigFactory implements Serializable { connectTimeout, connectMaxRetries, connectionPoolSize, + evenlyDistributionFactor, includeSchemaChanges, props); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 7cce9a10c..27e92bc47 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.config; +import org.apache.flink.annotation.Experimental; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -105,13 +106,6 @@ public class MySqlSourceOptions { .defaultValue(8096) .withDescription( "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); - // internal option, won't add to documentaion - public static final ConfigOption CHUNK_META_GROUP_SIZE = - ConfigOptions.key("chunk-meta.group.size") - .intType() - .defaultValue(1000) - .withDescription( - "The group size of chunk meta, if the meta size exceeds the group size, the meta will be will be divided into multiple groups."); public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = ConfigOptions.key("scan.snapshot.fetch.size") @@ -169,4 +163,26 @@ public class MySqlSourceOptions { .noDefaultValue() .withDescription( "Optional timestamp used in case of \"timestamp\" startup mode"); + + // ---------------------------------------------------------------------------- + // experimental options, won't add them to documentation + // ---------------------------------------------------------------------------- + @Experimental + public static final ConfigOption CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the group size, the meta will be will be divided into multiple groups."); + + @Experimental + public static final ConfigOption EVENLY_DISTRIBUTION_FACTOR = + ConfigOptions.key("evenly-distribution.factor") + .doubleType() + .defaultValue(1000.0d) + .withDescription( + "The factor is used to determine whether the table is evenly distribution or not." + + " the table chunks would use evenly calculation optimization when the data distribution is even," + + " and the sql query would be used when it is uneven." + + " The distribution factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount."); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 1786dede7..f471a79fa 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -75,6 +75,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final Duration connectTimeout; private final int connectionPoolSize; private final int connectMaxRetries; + private final double evenlyDistributionFactor; private final StartupOptions startupOptions; // -------------------------------------------------------------------------------------------- @@ -105,6 +106,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat Duration connectTimeout, int connectMaxRetries, int connectionPoolSize, + double evenlyDistributionFactor, StartupOptions startupOptions) { this.physicalSchema = physicalSchema; this.port = port; @@ -123,6 +125,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat this.connectTimeout = connectTimeout; this.connectMaxRetries = connectMaxRetries; this.connectionPoolSize = connectionPoolSize; + this.evenlyDistributionFactor = evenlyDistributionFactor; this.startupOptions = startupOptions; // Mutable attributes this.producedDataType = physicalSchema.toPhysicalRowDataType(); @@ -169,6 +172,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat .serverId(serverId) .splitSize(splitSize) .splitMetaGroupSize(splitMetaGroupSize) + .evenlyDistributionFactor(evenlyDistributionFactor) .fetchSize(fetchSize) .connectTimeout(connectTimeout) .connectMaxRetries(connectMaxRetries) @@ -249,6 +253,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat connectTimeout, connectMaxRetries, connectionPoolSize, + evenlyDistributionFactor, startupOptions); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; @@ -269,6 +274,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat && splitSize == that.splitSize && splitMetaGroupSize == that.splitMetaGroupSize && fetchSize == that.fetchSize + && evenlyDistributionFactor == that.evenlyDistributionFactor && Objects.equals(physicalSchema, that.physicalSchema) && Objects.equals(hostname, that.hostname) && Objects.equals(database, that.database) @@ -306,6 +312,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat connectTimeout, connectMaxRetries, connectionPoolSize, + evenlyDistributionFactor, startupOptions, producedDataType, metadataKeys); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index c60ae8d34..2d86d3265 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -43,6 +43,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.DATABASE_NAME; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT; @@ -92,6 +93,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + double evenlyDistributionFactor = config.get(EVENLY_DISTRIBUTION_FACTOR); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); if (enableParallelRead) { @@ -102,6 +104,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1); validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0); + validateEvenlyDistributionFactor(evenlyDistributionFactor); } return new MySqlTableSource( @@ -122,6 +125,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { connectTimeout, connectMaxRetries, connectionPoolSize, + evenlyDistributionFactor, startupOptions); } @@ -157,6 +161,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { options.add(SCAN_SNAPSHOT_FETCH_SIZE); options.add(CONNECT_TIMEOUT); options.add(CONNECTION_POOL_SIZE); + options.add(EVENLY_DISTRIBUTION_FACTOR); options.add(CONNECT_MAX_RETRIES); return options; } @@ -262,4 +267,13 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { e); } } + + /** Checks the value of given evenly distribution factor is valid. */ + private void validateEvenlyDistributionFactor(double evenlyDistributionFactor) { + checkState( + evenlyDistributionFactor >= 1.0d, + String.format( + "The value of option '%s' must larger than or equals %s, but is %s", + EVENLY_DISTRIBUTION_FACTOR.key(), 1.0d, evenlyDistributionFactor)); + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 7af8a7705..823468172 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -56,16 +57,17 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { public void testAssignSingleTableSplits() { List expected = Arrays.asList( - "customers null [109]", - "customers [109] [118]", - "customers [118] [1009]", - "customers [1009] [1012]", - "customers [1012] [1015]", - "customers [1015] [1018]", - "customers [1018] null"); + "customers null [462]", + "customers [462] [823]", + "customers [823] [1184]", + "customers [1184] [1545]", + "customers [1545] [1906]", + "customers [1906] null"); List splits = getTestAssignSnapshotSplits( - 4, new String[] {customerDatabase.getDatabaseName() + ".customers"}); + 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customers"}); assertEquals(expected, splits); } @@ -74,7 +76,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { List expected = Arrays.asList("customers null null"); List splits = getTestAssignSnapshotSplits( - 2000, new String[] {customerDatabase.getDatabaseName() + ".customers"}); + 2000, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customers"}); assertEquals(expected, splits); } @@ -82,23 +86,22 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { public void testAssignMultipleTableSplits() { List expected = Arrays.asList( - "customers null [109]", - "customers [109] [118]", - "customers [118] [1009]", - "customers [1009] [1012]", - "customers [1012] [1015]", - "customers [1015] [1018]", - "customers [1018] null", - "customers_1 null [109]", - "customers_1 [109] [118]", - "customers_1 [118] [1009]", - "customers_1 [1009] [1012]", - "customers_1 [1012] [1015]", - "customers_1 [1015] [1018]", - "customers_1 [1018] null"); + "customers null [462]", + "customers [462] [823]", + "customers [823] [1184]", + "customers [1184] [1545]", + "customers [1545] [1906]", + "customers [1906] null", + "customers_1 null [462]", + "customers_1 [462] [823]", + "customers_1 [823] [1184]", + "customers_1 [1184] [1545]", + "customers_1 [1545] [1906]", + "customers_1 [1906] null"); List splits = getTestAssignSnapshotSplits( 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers", customerDatabase.getDatabaseName() + ".customers_1" @@ -113,6 +116,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { List splits = getTestAssignSnapshotSplits( 2, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_big"}); assertEquals(expected, splits); } @@ -126,7 +130,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { "address [417420106184475563] null"); List splits = getTestAssignSnapshotSplits( - 4, new String[] {customerDatabase.getDatabaseName() + ".address"}); + 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".address"}); assertEquals(expected, splits); } @@ -134,17 +140,20 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { public void testAssignSnapshotSplitsWithDecimalKey() { List expected = Arrays.asList( - "shopping_cart_dec null [124456.4560]", - "shopping_cart_dec [124456.4560] null"); + "shopping_cart_dec null [124812.1230]", + "shopping_cart_dec [124812.1230] null"); List splits = getTestAssignSnapshotSplits( 2, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_dec"}); assertEquals(expected, splits); } - private List getTestAssignSnapshotSplits(int splitSize, String[] captureTables) { - MySqlSourceConfig configuration = getConfig(splitSize, captureTables); + private List getTestAssignSnapshotSplits( + int splitSize, double evenlyDistributionFactor, String[] captureTables) { + MySqlSourceConfig configuration = + getConfig(splitSize, evenlyDistributionFactor, captureTables); List remainingTables = Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList()); final MySqlSnapshotSplitAssigner assigner = @@ -190,16 +199,46 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { "customer_card [50001] null"); List splits = getTestAssignSnapshotSplits( - 4, new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); + 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); assertEquals(expected, splits); } + @Test + public void testAssignTableWithSparseDistributionSplitKey() { + // test table with sparse split key order like 0,10000,20000,3000 instead of 0,1,2,3 + List expected = + Arrays.asList( + "customer_card null [26317]", + "customer_card [26317] [32633]", + "customer_card [32633] [38949]", + "customer_card [38949] [45265]", + "customer_card [45265] null"); + List splits = + getTestAssignSnapshotSplits( + 4, + 2000.0d, + new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); + assertEquals(expected, splits); + + // test table with sparse split key and big chunk size + List expected1 = Arrays.asList("customer_card null null"); + List splits1 = + getTestAssignSnapshotSplits( + 8096, + 10000.0d, + new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); + assertEquals(expected1, splits1); + } + @Test public void testAssignTableWithSingleLine() { List expected = Collections.singletonList("customer_card_single_line null null"); List splits = getTestAssignSnapshotSplits( 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customer_card_single_line" }); @@ -216,7 +255,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { "shopping_cart [user_5] null"); List splits = getTestAssignSnapshotSplits( - 4, new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); + 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); assertEquals(expected, splits); } @@ -230,7 +271,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { "shopping_cart [user_5] null"); List splits = getTestAssignSnapshotSplits( - 4, new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); + 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); assertEquals(expected, splits); } @@ -238,29 +281,22 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { public void testAssignMinSplitSize() { List expected = Arrays.asList( - "customers null [102]", - "customers [102] [103]", - "customers [103] [109]", - "customers [109] [110]", - "customers [110] [111]", - "customers [111] [118]", - "customers [118] [121]", - "customers [121] [123]", - "customers [123] [1009]", - "customers [1009] [1010]", - "customers [1010] [1011]", - "customers [1011] [1012]", - "customers [1012] [1013]", - "customers [1013] [1014]", - "customers [1014] [1015]", - "customers [1015] [1016]", - "customers [1016] [1017]", - "customers [1017] [1018]", - "customers [1018] [1019]", - "customers [1019] null"); + "customers null [281]", + "customers [281] [461]", + "customers [461] [641]", + "customers [641] [821]", + "customers [821] [1001]", + "customers [1001] [1181]", + "customers [1181] [1361]", + "customers [1361] [1541]", + "customers [1541] [1721]", + "customers [1721] [1901]", + "customers [1901] null"); List splits = getTestAssignSnapshotSplits( - 2, new String[] {customerDatabase.getDatabaseName() + ".customers"}); + 2, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customers"}); assertEquals(expected, splits); } @@ -269,7 +305,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { List expected = Collections.singletonList("customers null null"); List splits = getTestAssignSnapshotSplits( - 2000, new String[] {customerDatabase.getDatabaseName() + ".customers"}); + 2000, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customers"}); assertEquals(expected, splits); } @@ -277,7 +315,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { public void testUnMatchedPrimaryKey() { try { getTestAssignSnapshotSplits( - 4, new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); + 4, + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); } catch (Throwable t) { assertTrue( ExceptionUtils.findThrowableWithMessage( @@ -287,7 +327,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { } } - private MySqlSourceConfig getConfig(int splitSize, String[] captureTables) { + private MySqlSourceConfig getConfig( + int splitSize, double evenlyDistributionFactor, String[] captureTables) { return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.initial()) .databaseList(customerDatabase.getDatabaseName()) @@ -296,6 +337,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { .port(MYSQL_CONTAINER.getDatabasePort()) .splitSize(splitSize) .fetchSize(2) + .evenlyDistributionFactor(evenlyDistributionFactor) .username(customerDatabase.getUsername()) .password(customerDatabase.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 1ac82d3f9..fa16dbd40 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -48,6 +48,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; @@ -116,6 +117,7 @@ public class MySqlTableSourceFactoryTest { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.initial()); assertEquals(expectedSource, actualSource); } @@ -127,6 +129,7 @@ public class MySqlTableSourceFactoryTest { properties.put("server-id", "123-126"); properties.put("scan.incremental.snapshot.chunk.size", "8000"); properties.put("chunk-meta.group.size", "3000"); + properties.put("evenly-distribution.factor", "40.5"); properties.put("scan.snapshot.fetch.size", "100"); properties.put("connect.timeout", "45s"); @@ -151,6 +154,7 @@ public class MySqlTableSourceFactoryTest { Duration.ofSeconds(45), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + 40.5d, StartupOptions.initial()); assertEquals(expectedSource, actualSource); } @@ -185,6 +189,7 @@ public class MySqlTableSourceFactoryTest { Duration.ofSeconds(45), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.initial()); assertEquals(expectedSource, actualSource); } @@ -217,6 +222,7 @@ public class MySqlTableSourceFactoryTest { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.latest()); assertEquals(expectedSource, actualSource); } @@ -251,6 +257,7 @@ public class MySqlTableSourceFactoryTest { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.initial()); assertEquals(expectedSource, actualSource); } @@ -305,6 +312,7 @@ public class MySqlTableSourceFactoryTest { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.initial()); assertEquals(expectedSource, actualSource); } @@ -368,6 +376,7 @@ public class MySqlTableSourceFactoryTest { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.latest()); assertEquals(expectedSource, actualSource); } @@ -404,6 +413,7 @@ public class MySqlTableSourceFactoryTest { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), + EVENLY_DISTRIBUTION_FACTOR.defaultValue(), StartupOptions.initial()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -490,6 +500,21 @@ public class MySqlTableSourceFactoryTest { "The value of option 'chunk-meta.group.size' must larger than 1, but is 1")); } + // validate illegal split meta group size + try { + Map properties = getAllOptions(); + properties.put("scan.incremental.snapshot.enabled", "true"); + properties.put("evenly-distribution.factor", "0.8"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertThat( + t, + containsMessage( + "The value of option 'evenly-distribution.factor' must larger than or equals 1.0, but is 0.8")); + } + // validate illegal connection pool size try { Map properties = getAllOptions();