diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index ea69c8617..4cd66d5ab 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -139,9 +139,21 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
-    /** The factor is used to determine whether the table is evenly distribution or not. */
-    public MySqlSourceBuilder<T> evenlyDistributionFactor(Double evenlyDistributionFactor) {
-        this.configFactory.evenlyDistributionFactor(evenlyDistributionFactor);
+    /**
+     * The upper bound of split key evenly distribution factor, the factor is used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public MySqlSourceBuilder<T> distributionFactorUpper(double distributionFactorUpper) {
+        this.configFactory.distributionFactorUpper(distributionFactorUpper);
+        return this;
+    }
+
+    /**
+     * The lower bound of split key evenly distribution factor, the factor is used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public MySqlSourceBuilder<T> distributionFactorLower(double distributionFactorLower) {
+        this.configFactory.distributionFactorLower(distributionFactorLower);
         return this;
     }
 
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java
index 8c58fcfb8..e2735c238 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
+import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
 import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryApproximateRowCnt;
 import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMin;
 import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMinMax;
@@ -60,6 +61,7 @@ import static java.math.BigDecimal.ROUND_CEILING;
  * {@link MySqlSnapshotSplit}).
  */
 class ChunkSplitter {
+
     private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class);
 
     private final MySqlSourceConfig sourceConfig;
@@ -119,6 +121,11 @@ class ChunkSplitter {
     // Utilities
     // --------------------------------------------------------------------------------------------
 
+    /**
+     * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
+     * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
+     * many queries and is not efficient.
+     */
     private List<ChunkRange> splitTableIntoChunks(
             JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
         final String splitColumnName = splitColumn.name();
@@ -130,50 +137,45 @@ class ChunkSplitter {
             return Collections.singletonList(ChunkRange.all());
         }
 
-        final List<ChunkRange> chunks;
         final int chunkSize = sourceConfig.getSplitSize();
-        final double evenlyDistributionFactor = sourceConfig.getEvenlyDistributionFactor();
+        final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
+        final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
 
-        boolean isSplitColumnEvenlyDistributed = false;
-        double distributionFactor = 0.0d;
         if (isEvenlySplitColumn(splitColumn)) {
-            // optimization: table with single chunk, avoid querying from db
-            if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) {
-                isSplitColumnEvenlyDistributed = true;
+            long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+            double distributionFactor =
+                    calculateDistributionFactor(tableId, min, max, approximateRowCnt);
+
+            boolean dataIsEvenlyDistributed =
+                    doubleCompare(distributionFactor, distributionFactorLower) >= 0
+                            && doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
+
+            if (dataIsEvenlyDistributed) {
+                // the minimum dynamic chunk size is at least 1
+                final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
+                return splitEvenlySizedChunks(
+                        tableId, min, max, approximateRowCnt, dynamicChunkSize);
             } else {
-                distributionFactor = calculateDistributionFactor(jdbc, tableId, min, max);
-                isSplitColumnEvenlyDistributed = distributionFactor <= evenlyDistributionFactor;
+                return splitUnevenlySizedChunks(
+                        jdbc, tableId, splitColumnName, min, max, chunkSize);
             }
-        }
-
-        if (isSplitColumnEvenlyDistributed) {
-            // use evenly-sized chunks which is much efficient
-            final int dynamicChunkSize =
-                    getDynamicChunkSize(chunkSize, distributionFactor, evenlyDistributionFactor);
-            LOG.info(
-                    "Use evenly-sized chunk optimization for table {}, the distribution factor is {}, the chunk size is {}",
-                    tableId,
-                    distributionFactor,
-                    dynamicChunkSize);
-            chunks = splitEvenlySizedChunks(min, max, dynamicChunkSize);
         } else {
-            // use unevenly-sized chunks which will request many queries and is not efficient.
-            LOG.info(
-                    "Use unevenly-sized chunks for table{}, the chunk size is {}",
-                    tableId,
-                    chunkSize);
-            chunks = splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
+            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
         }
-
-        return chunks;
     }
 
     /**
      * Split table into evenly sized chunks based on the numeric min and max value of split column,
      * and tumble chunks in step size.
      */
-    private List<ChunkRange> splitEvenlySizedChunks(Object min, Object max, int chunkSize) {
-        if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) {
+    private List<ChunkRange> splitEvenlySizedChunks(
+            TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) {
+        LOG.info(
+                "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}",
+                tableId,
+                approximateRowCnt,
+                chunkSize);
+        if (approximateRowCnt <= chunkSize) {
             // there is no more than one chunk, return full table as a chunk
             return Collections.singletonList(ChunkRange.all());
         }
@@ -200,6 +202,8 @@ class ChunkSplitter {
             Object max,
             int chunkSize)
             throws SQLException {
+        LOG.info(
+                "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
         final List<ChunkRange> splits = new ArrayList<>();
         Object chunkStart = null;
         Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
@@ -276,20 +280,10 @@ class ChunkSplitter {
                 || typeRoot == LogicalTypeRoot.DECIMAL;
     }
 
-    /** Returns the dynamic chunk size of evenly distributed table. */
-    private static int getDynamicChunkSize(
-            int chunkSize, double distributionFactor, double evenlyDistributionFactor) {
-        int dynamicChunkSize = chunkSize;
-        if (distributionFactor <= evenlyDistributionFactor) {
-            dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), dynamicChunkSize);
-        }
-        return dynamicChunkSize;
-    }
-
     /** Returns the distribution factor of the given table. */
-    private static double calculateDistributionFactor(
-            JdbcConnection jdbc, TableId tableId, Object min, Object max) throws SQLException {
-        final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+    private double calculateDistributionFactor(
+            TableId tableId, Object min, Object max, long approximateRowCnt) {
+
         if (!min.getClass().equals(max.getClass())) {
             throw new IllegalStateException(
                     String.format(
@@ -300,9 +294,18 @@ class ChunkSplitter {
             return Double.MAX_VALUE;
         }
         BigDecimal difference = ObjectUtils.minus(max, min);
-        // factor = max - min + 1 / rowCount
+        // factor = (max - min + 1) / rowCount
         final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
-        return subRowCnt.divide(new BigDecimal(approximateRowCnt), 2, ROUND_CEILING).doubleValue();
+        double distributionFactor =
+                subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
+        LOG.info(
+                "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
+                tableId,
+                distributionFactor,
+                min,
+                max,
+                approximateRowCnt);
+        return distributionFactor;
     }
 
     private static String splitId(TableId tableId, int chunkId) {
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 7530442f2..0fa8c4917 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -52,7 +52,8 @@ public class MySqlSourceConfig implements Serializable {
     private final Duration connectTimeout;
     private final int connectMaxRetries;
     private final int connectionPoolSize;
-    private final double evenlyDistributionFactor;
+    private final double distributionFactorUpper;
+    private final double distributionFactorLower;
     private final boolean includeSchemaChanges;
 
     // --------------------------------------------------------------------------------------------
@@ -78,7 +79,8 @@ public class MySqlSourceConfig implements Serializable {
             Duration connectTimeout,
             int connectMaxRetries,
             int connectionPoolSize,
-            double evenlyDistributionFactor,
+            double distributionFactorUpper,
+            double distributionFactorLower,
             boolean includeSchemaChanges,
             Properties dbzProperties) {
         this.hostname = checkNotNull(hostname);
@@ -96,7 +98,8 @@ public class MySqlSourceConfig implements Serializable {
         this.connectTimeout = checkNotNull(connectTimeout);
         this.connectMaxRetries = connectMaxRetries;
         this.connectionPoolSize = connectionPoolSize;
-        this.evenlyDistributionFactor = evenlyDistributionFactor;
+        this.distributionFactorUpper = distributionFactorUpper;
+        this.distributionFactorLower = distributionFactorLower;
         this.includeSchemaChanges = includeSchemaChanges;
         this.dbzProperties = checkNotNull(dbzProperties);
         this.dbzConfiguration = Configuration.from(dbzProperties);
@@ -144,8 +147,12 @@ public class MySqlSourceConfig implements Serializable {
         return splitMetaGroupSize;
     }
 
-    public double getEvenlyDistributionFactor() {
-        return evenlyDistributionFactor;
+    public double getDistributionFactorUpper() {
+        return distributionFactorUpper;
+    }
+
+    public double getDistributionFactorLower() {
+        return distributionFactorLower;
     }
 
     public int getFetchSize() {
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index adeda5919..51ceb78b8 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -35,10 +35,11 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
-import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A factory to construct {@link MySqlSourceConfig}. */
@@ -62,7 +63,10 @@ public class MySqlSourceConfigFactory implements Serializable {
     private Duration connectTimeout = CONNECT_TIMEOUT.defaultValue();
     private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue();
     private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue();
-    private double evenlyDistributionFactor = EVENLY_DISTRIBUTION_FACTOR.defaultValue();
+    private double distributionFactorUpper =
+            SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
+    private double distributionFactorLower =
+            SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
     private boolean includeSchemaChanges = false;
     private Properties dbzProperties;
 
@@ -152,9 +156,21 @@ public class MySqlSourceConfigFactory implements Serializable {
         return this;
     }
 
-    /** The factor is used to determine whether the table is evenly distribution or not. */
-    public MySqlSourceConfigFactory evenlyDistributionFactor(Double evenlyDistributionFactor) {
-        this.evenlyDistributionFactor = evenlyDistributionFactor;
+    /**
+     * The upper bound of split key evenly distribution factor, the factor is used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public MySqlSourceConfigFactory distributionFactorUpper(double distributionFactorUpper) {
+        this.distributionFactorUpper = distributionFactorUpper;
+        return this;
+    }
+
+    /**
+     * The lower bound of split key evenly distribution factor, the factor is used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public MySqlSourceConfigFactory distributionFactorLower(double distributionFactorLower) {
+        this.distributionFactorLower = distributionFactorLower;
         return this;
     }
 
@@ -284,7 +300,8 @@ public class MySqlSourceConfigFactory implements Serializable {
                 connectTimeout,
                 connectMaxRetries,
                 connectionPoolSize,
-                evenlyDistributionFactor,
+                distributionFactorUpper,
+                distributionFactorLower,
                 includeSchemaChanges,
                 props);
     }
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index 27e92bc47..5b2b59bc5 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -176,13 +176,26 @@ public class MySqlSourceOptions {
                             "The group size of chunk meta, if the meta size exceeds the group size, the meta will be will be divided into multiple groups.");
 
     @Experimental
-    public static final ConfigOption<Double> EVENLY_DISTRIBUTION_FACTOR =
-            ConfigOptions.key("evenly-distribution.factor")
+    public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
                     .doubleType()
                     .defaultValue(1000.0d)
                     .withDescription(
-                            "The factor is used to determine whether the table is evenly distribution or not."
-                                    + " the table chunks would use evenly calculation optimization when the data distribution is even,"
-                                    + " and the sql query would be used when it is uneven."
-                                    + " The distribution factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount.");
+                            "The upper bound of split key distribution factor. The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly calculation optimization when the data distribution is even,"
+                                    + " and the query MySQL for splitting would happen when it is uneven."
+                                    + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    .withDescription(
+                            "The lower bound of split key distribution factor. The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly calculation optimization when the data distribution is even,"
+                                    + " and the query MySQL for splitting would happen when it is uneven."
+                                    + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
 }
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java
index ecd8f6540..67c718bb5 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java
@@ -87,4 +87,16 @@ public class ObjectUtils {
         Comparable<Object> c2 = (Comparable<Object>) obj2;
         return c1.compareTo(c2);
     }
+
+    /**
+     * Compares two Double numeric object.
+     *
+     * @return -1, 0, or 1 as this {@code arg1} is numerically less than, equal to, or greater than
+     *     {@code arg2}.
+     */
+    public static int doubleCompare(double arg1, double arg2) {
+        BigDecimal bigDecimal1 = BigDecimal.valueOf(arg1);
+        BigDecimal bigDecimal2 = BigDecimal.valueOf(arg2);
+        return bigDecimal1.compareTo(bigDecimal2);
+    }
 }
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java
index f471a79fa..94815c8c6 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -75,7 +75,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
     private final Duration connectTimeout;
     private final int connectionPoolSize;
     private final int connectMaxRetries;
-    private final double evenlyDistributionFactor;
+    private final double distributionFactorUpper;
+    private final double distributionFactorLower;
     private final StartupOptions startupOptions;
 
     // --------------------------------------------------------------------------------------------
@@ -106,7 +107,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             Duration connectTimeout,
             int connectMaxRetries,
             int connectionPoolSize,
-            double evenlyDistributionFactor,
+            double distributionFactorUpper,
+            double distributionFactorLower,
             StartupOptions startupOptions) {
         this.physicalSchema = physicalSchema;
         this.port = port;
@@ -125,7 +127,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
         this.connectTimeout = connectTimeout;
         this.connectMaxRetries = connectMaxRetries;
         this.connectionPoolSize = connectionPoolSize;
-        this.evenlyDistributionFactor = evenlyDistributionFactor;
+        this.distributionFactorUpper = distributionFactorUpper;
+        this.distributionFactorLower = distributionFactorLower;
         this.startupOptions = startupOptions;
         // Mutable attributes
         this.producedDataType = physicalSchema.toPhysicalRowDataType();
@@ -172,7 +175,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                             .serverId(serverId)
                             .splitSize(splitSize)
                             .splitMetaGroupSize(splitMetaGroupSize)
-                            .evenlyDistributionFactor(evenlyDistributionFactor)
+                            .distributionFactorUpper(distributionFactorUpper)
+                            .distributionFactorLower(distributionFactorLower)
                             .fetchSize(fetchSize)
                             .connectTimeout(connectTimeout)
                             .connectMaxRetries(connectMaxRetries)
@@ -253,7 +257,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         connectTimeout,
                         connectMaxRetries,
                         connectionPoolSize,
-                        evenlyDistributionFactor,
+                        distributionFactorUpper,
+                        distributionFactorLower,
                         startupOptions);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
@@ -274,7 +279,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 && splitSize == that.splitSize
                 && splitMetaGroupSize == that.splitMetaGroupSize
                 && fetchSize == that.fetchSize
-                && evenlyDistributionFactor == that.evenlyDistributionFactor
+                && distributionFactorUpper == that.distributionFactorUpper
+                && distributionFactorLower == that.distributionFactorLower
                 && Objects.equals(physicalSchema, that.physicalSchema)
                 && Objects.equals(hostname, that.hostname)
                 && Objects.equals(database, that.database)
@@ -312,7 +318,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 connectTimeout,
                 connectMaxRetries,
                 connectionPoolSize,
-                evenlyDistributionFactor,
+                distributionFactorUpper,
+                distributionFactorLower,
                 startupOptions,
                 producedDataType,
                 metadataKeys);
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index 2d86d3265..ee9009aad 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -43,7 +43,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
-import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT;
@@ -56,8 +55,11 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_ID;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.TABLE_NAME;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME;
+import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
 import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -93,7 +95,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
         Duration connectTimeout = config.get(CONNECT_TIMEOUT);
         int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
         int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
-        double evenlyDistributionFactor = config.get(EVENLY_DISTRIBUTION_FACTOR);
+        double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
 
         boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
         if (enableParallelRead) {
@@ -104,7 +107,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
             validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
             validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
             validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
-            validateEvenlyDistributionFactor(evenlyDistributionFactor);
+            validateDistributionFactorUpper(distributionFactorUpper);
+            validateDistributionFactorLower(distributionFactorLower);
         }
 
         return new MySqlTableSource(
@@ -125,7 +129,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
                 connectTimeout,
                 connectMaxRetries,
                 connectionPoolSize,
-                evenlyDistributionFactor,
+                distributionFactorUpper,
+                distributionFactorLower,
                 startupOptions);
     }
 
@@ -161,7 +166,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
         options.add(SCAN_SNAPSHOT_FETCH_SIZE);
         options.add(CONNECT_TIMEOUT);
         options.add(CONNECTION_POOL_SIZE);
-        options.add(EVENLY_DISTRIBUTION_FACTOR);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         options.add(CONNECT_MAX_RETRIES);
         return options;
     }
@@ -268,12 +274,27 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
         }
     }
 
-    /** Checks the value of given evenly distribution factor is valid. */
-    private void validateEvenlyDistributionFactor(double evenlyDistributionFactor) {
+    /** Checks the value of given evenly distribution factor upper bound is valid. */
+    private void validateDistributionFactorUpper(double distributionFactorUpper) {
         checkState(
-                evenlyDistributionFactor >= 1.0d,
+                doubleCompare(distributionFactorUpper, 1.0d) >= 0,
                 String.format(
                         "The value of option '%s' must larger than or equals %s, but is %s",
-                        EVENLY_DISTRIBUTION_FACTOR.key(), 1.0d, evenlyDistributionFactor));
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+                        1.0d,
+                        distributionFactorUpper));
+    }
+
+    /** Checks the value of given evenly distribution factor lower bound is valid. */
+    private void validateDistributionFactorLower(double distributionFactorLower) {
+        checkState(
+                doubleCompare(distributionFactorLower, 0.0d) >= 0
+                        && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+                String.format(
+                        "The value of option '%s' must between %s and %s inclusively, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+                        0.0d,
+                        1.0d,
+                        distributionFactorLower));
     }
 }
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index 5659c6a21..b560c468e 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -74,7 +74,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
     @Test
     public void testReadSingleBinlogSplit() throws Exception {
         customerDatabase.createAndInitialize();
-        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
+        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
         binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
         final DataType dataType =
@@ -83,7 +83,8 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
                         DataTypes.FIELD("name", DataTypes.STRING()),
                         DataTypes.FIELD("address", DataTypes.STRING()),
                         DataTypes.FIELD("phone_number", DataTypes.STRING()));
-        List<MySqlSnapshotSplit> splits = getMySqlSplits(new String[] {"customers"}, sourceConfig);
+        List<MySqlSnapshotSplit> splits =
+                getMySqlSplits(new String[] {"customers_even_dist"}, sourceConfig);
         String[] expected =
                 new String[] {
                     "+I[101, user_1, Shanghai, 123567891234]",
@@ -94,13 +95,8 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
                     "+U[103, user_3, Hangzhou, 123567891234]",
                     "-U[103, user_3, Hangzhou, 123567891234]",
                     "+U[103, user_3, Shanghai, 123567891234]",
-                    "+I[103, user_3, Shanghai, 123567891234]",
-                    "+I[109, user_4, Shanghai, 123567891234]",
-                    "+I[110, user_5, Shanghai, 123567891234]",
-                    "+I[111, user_6, Shanghai, 123567891234]",
-                    "+I[118, user_7, Shanghai, 123567891234]",
-                    "+I[121, user_8, Shanghai, 123567891234]",
-                    "+I[123, user_9, Shanghai, 123567891234]"
+                    "+I[104, user_4, Shanghai, 123567891234]",
+                    "+I[103, user_3, Shanghai, 123567891234]"
                 };
 
         List<String> actual =
@@ -117,7 +113,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
     @Test
     public void testReadAllBinlogSplitsForOneTable() throws Exception {
         customerDatabase.createAndInitialize();
-        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
+        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
         binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
         final DataType dataType =
@@ -126,39 +122,27 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
                         DataTypes.FIELD("name", DataTypes.STRING()),
                         DataTypes.FIELD("address", DataTypes.STRING()),
                         DataTypes.FIELD("phone_number", DataTypes.STRING()));
-        List<MySqlSnapshotSplit> splits = getMySqlSplits(new String[] {"customers"}, sourceConfig);
+        List<MySqlSnapshotSplit> splits =
+                getMySqlSplits(new String[] {"customers_even_dist"}, sourceConfig);
 
         String[] expected =
                 new String[] {
                     "+I[101, user_1, Shanghai, 123567891234]",
                     "+I[102, user_2, Shanghai, 123567891234]",
-                    "-D[102, user_2, Shanghai, 123567891234]",
-                    "+I[102, user_2, Shanghai, 123567891234]",
+                    "+I[103, user_3, Shanghai, 123567891234]",
+                    "+I[104, user_4, Shanghai, 123567891234]",
+                    "+I[105, user_5, Shanghai, 123567891234]",
+                    "+I[106, user_6, Shanghai, 123567891234]",
+                    "+I[107, user_7, Shanghai, 123567891234]",
+                    "+I[108, user_8, Shanghai, 123567891234]",
+                    "+I[109, user_9, Shanghai, 123567891234]",
+                    "+I[110, user_10, Shanghai, 123567891234]",
                     "-U[103, user_3, Shanghai, 123567891234]",
                     "+U[103, user_3, Hangzhou, 123567891234]",
+                    "-D[102, user_2, Shanghai, 123567891234]",
+                    "+I[102, user_2, Shanghai, 123567891234]",
                     "-U[103, user_3, Hangzhou, 123567891234]",
                     "+U[103, user_3, Shanghai, 123567891234]",
-                    "+I[103, user_3, Shanghai, 123567891234]",
-                    "+I[109, user_4, Shanghai, 123567891234]",
-                    "+I[110, user_5, Shanghai, 123567891234]",
-                    "+I[111, user_6, Shanghai, 123567891234]",
-                    "+I[118, user_7, Shanghai, 123567891234]",
-                    "+I[121, user_8, Shanghai, 123567891234]",
-                    "+I[123, user_9, Shanghai, 123567891234]",
-                    "+I[1009, user_10, Shanghai, 123567891234]",
-                    "+I[1010, user_11, Shanghai, 123567891234]",
-                    "-U[1010, user_11, Shanghai, 123567891234]",
-                    "+U[1010, Hangzhou, Shanghai, 123567891234]",
-                    "+I[1011, user_12, Shanghai, 123567891234]",
-                    "+I[1012, user_13, Shanghai, 123567891234]",
-                    "+I[1013, user_14, Shanghai, 123567891234]",
-                    "+I[1014, user_15, Shanghai, 123567891234]",
-                    "+I[1015, user_16, Shanghai, 123567891234]",
-                    "+I[1016, user_17, Shanghai, 123567891234]",
-                    "+I[1017, user_18, Shanghai, 123567891234]",
-                    "+I[1018, user_19, Shanghai, 123567891234]",
-                    "+I[1019, user_20, Shanghai, 123567891234]",
-                    "+I[2000, user_21, Shanghai, 123567891234]",
                     "+I[2001, user_22, Shanghai, 123567891234]",
                     "+I[2002, user_23, Shanghai, 123567891234]",
                     "+I[2003, user_24, Shanghai, 123567891234]"
@@ -603,7 +587,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
                 .hostname(MYSQL_CONTAINER.getHost())
                 .port(MYSQL_CONTAINER.getDatabasePort())
                 .username(customerDatabase.getUsername())
-                .splitSize(10)
+                .splitSize(4)
                 .fetchSize(2)
                 .password(customerDatabase.getPassword())
                 .createConfig(0);
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
index 358f292d2..1e2066d59 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
@@ -56,14 +56,14 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
     @BeforeClass
     public static void init() {
         customerDatabase.createAndInitialize();
-        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
+        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}, 10);
         binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
     }
 
     @Test
     public void testReadSingleSnapshotSplit() throws Exception {
-        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
+        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
         final DataType dataType =
                 DataTypes.ROW(
                         DataTypes.FIELD("id", DataTypes.BIGINT()),
@@ -77,12 +77,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
                     "+I[101, user_1, Shanghai, 123567891234]",
                     "+I[102, user_2, Shanghai, 123567891234]",
                     "+I[103, user_3, Shanghai, 123567891234]",
-                    "+I[109, user_4, Shanghai, 123567891234]",
-                    "+I[110, user_5, Shanghai, 123567891234]",
-                    "+I[111, user_6, Shanghai, 123567891234]",
-                    "+I[118, user_7, Shanghai, 123567891234]",
-                    "+I[121, user_8, Shanghai, 123567891234]",
-                    "+I[123, user_9, Shanghai, 123567891234]"
+                    "+I[104, user_4, Shanghai, 123567891234]"
                 };
         List<String> actual = readTableSnapshotSplits(mySqlSplits, sourceConfig, 1, dataType);
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
@@ -90,7 +85,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
 
     @Test
     public void testReadAllSnapshotSplitsForOneTable() throws Exception {
-        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
+        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
 
         final DataType dataType =
                 DataTypes.ROW(
@@ -105,24 +100,13 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
                     "+I[101, user_1, Shanghai, 123567891234]",
                     "+I[102, user_2, Shanghai, 123567891234]",
                     "+I[103, user_3, Shanghai, 123567891234]",
-                    "+I[109, user_4, Shanghai, 123567891234]",
-                    "+I[110, user_5, Shanghai, 123567891234]",
-                    "+I[111, user_6, Shanghai, 123567891234]",
-                    "+I[118, user_7, Shanghai, 123567891234]",
-                    "+I[121, user_8, Shanghai, 123567891234]",
-                    "+I[123, user_9, Shanghai, 123567891234]",
-                    "+I[1009, user_10, Shanghai, 123567891234]",
-                    "+I[1010, user_11, Shanghai, 123567891234]",
-                    "+I[1011, user_12, Shanghai, 123567891234]",
-                    "+I[1012, user_13, Shanghai, 123567891234]",
-                    "+I[1013, user_14, Shanghai, 123567891234]",
-                    "+I[1014, user_15, Shanghai, 123567891234]",
-                    "+I[1015, user_16, Shanghai, 123567891234]",
-                    "+I[1016, user_17, Shanghai, 123567891234]",
-                    "+I[1017, user_18, Shanghai, 123567891234]",
-                    "+I[1018, user_19, Shanghai, 123567891234]",
-                    "+I[1019, user_20, Shanghai, 123567891234]",
-                    "+I[2000, user_21, Shanghai, 123567891234]"
+                    "+I[104, user_4, Shanghai, 123567891234]",
+                    "+I[105, user_5, Shanghai, 123567891234]",
+                    "+I[106, user_6, Shanghai, 123567891234]",
+                    "+I[107, user_7, Shanghai, 123567891234]",
+                    "+I[108, user_8, Shanghai, 123567891234]",
+                    "+I[109, user_9, Shanghai, 123567891234]",
+                    "+I[110, user_10, Shanghai, 123567891234]"
                 };
         List<String> actual =
                 readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
@@ -131,7 +115,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
 
     @Test
     public void testReadAllSplitForTableWithSingleLine() throws Exception {
-        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"});
+        MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"}, 10);
 
         final DataType dataType =
                 DataTypes.ROW(
@@ -149,7 +133,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
     @Test
     public void testReadAllSnapshotSplitsForTables() throws Exception {
         MySqlSourceConfig sourceConfig =
-                getConfig(new String[] {"customer_card", "customer_card_single_line"});
+                getConfig(new String[] {"customer_card", "customer_card_single_line"}, 10);
 
         DataType dataType =
                 DataTypes.ROW(
@@ -249,7 +233,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
         return mySqlSplitList;
     }
 
-    public static MySqlSourceConfig getConfig(String[] captureTables) {
+    public static MySqlSourceConfig getConfig(String[] captureTables, int splitSize) {
         String[] captureTableIds =
                 Arrays.stream(captureTables)
                         .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
@@ -262,7 +246,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
                 .hostname(MYSQL_CONTAINER.getHost())
                 .port(MYSQL_CONTAINER.getDatabasePort())
                 .username(customerDatabase.getUsername())
-                .splitSize(10)
+                .splitSize(splitSize)
                 .fetchSize(2)
                 .password(customerDatabase.getPassword())
                 .createConfig(0);
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java
index b4346ee34..a6f55843a 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java
@@ -32,6 +32,7 @@ import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
 import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
 import io.debezium.connector.mysql.MySqlConnection;
 import io.debezium.jdbc.JdbcConnection;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
 import java.sql.SQLException;
@@ -44,6 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
+
 /** IT tests for {@link MySqlSource}. */
 public class MySqlSourceITCase extends MySqlSourceTestBase {
 
@@ -254,11 +257,12 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
     }
 
     private String getTableName(String[] captureCustomerTables) {
+        checkState(captureCustomerTables.length > 0);
         if (captureCustomerTables.length == 1) {
-            return "customers";
+            return captureCustomerTables[0];
         } else {
-            // pattern that matches test table: customers and customers_1
-            return "customers.*";
+            // pattern that matches multiple tables
+            return String.format("(%s)", StringUtils.join(captureCustomerTables, "|"));
         }
     }
 
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 823468172..02e58e00e 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -38,7 +38,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -57,17 +58,15 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
     public void testAssignSingleTableSplits() {
         List<String> expected =
                 Arrays.asList(
-                        "customers null [462]",
-                        "customers [462] [823]",
-                        "customers [823] [1184]",
-                        "customers [1184] [1545]",
-                        "customers [1545] [1906]",
-                        "customers [1906] null");
+                        "customers_even_dist null [105]",
+                        "customers_even_dist [105] [109]",
+                        "customers_even_dist [109] null");
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
-                        new String[] {customerDatabase.getDatabaseName() + ".customers"});
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"});
         assertEquals(expected, splits);
     }
 
@@ -77,7 +76,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         2000,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".customers"});
         assertEquals(expected, splits);
     }
@@ -86,25 +86,20 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
     public void testAssignMultipleTableSplits() {
         List<String> expected =
                 Arrays.asList(
-                        "customers null [462]",
-                        "customers [462] [823]",
-                        "customers [823] [1184]",
-                        "customers [1184] [1545]",
-                        "customers [1545] [1906]",
-                        "customers [1906] null",
-                        "customers_1 null [462]",
-                        "customers_1 [462] [823]",
-                        "customers_1 [823] [1184]",
-                        "customers_1 [1184] [1545]",
-                        "customers_1 [1545] [1906]",
-                        "customers_1 [1906] null");
+                        "customers_even_dist null [105]",
+                        "customers_even_dist [105] [109]",
+                        "customers_even_dist [109] null",
+                        "customers_sparse_dist null [10]",
+                        "customers_sparse_dist [10] [18]",
+                        "customers_sparse_dist [18] null");
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {
-                            customerDatabase.getDatabaseName() + ".customers",
-                            customerDatabase.getDatabaseName() + ".customers_1"
+                            customerDatabase.getDatabaseName() + ".customers_even_dist",
+                            customerDatabase.getDatabaseName() + ".customers_sparse_dist"
                         });
         assertEquals(expected, splits);
     }
@@ -116,7 +111,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         2,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_big"});
         assertEquals(expected, splits);
     }
@@ -131,7 +127,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".address"});
         assertEquals(expected, splits);
     }
@@ -140,53 +137,17 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
     public void testAssignSnapshotSplitsWithDecimalKey() {
         List<String> expected =
                 Arrays.asList(
-                        "shopping_cart_dec null [124812.1230]",
-                        "shopping_cart_dec [124812.1230] null");
+                        "shopping_cart_dec null [123458.1230]",
+                        "shopping_cart_dec [123458.1230] null");
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         2,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_dec"});
         assertEquals(expected, splits);
     }
 
-    private List<String> getTestAssignSnapshotSplits(
-            int splitSize, double evenlyDistributionFactor, String[] captureTables) {
-        MySqlSourceConfig configuration =
-                getConfig(splitSize, evenlyDistributionFactor, captureTables);
-        List<TableId> remainingTables =
-                Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList());
-        final MySqlSnapshotSplitAssigner assigner =
-                new MySqlSnapshotSplitAssigner(
-                        configuration, DEFAULT_PARALLELISM, remainingTables, false);
-
-        assigner.open();
-        List<MySqlSplit> sqlSplits = new ArrayList<>();
-        while (true) {
-            Optional<MySqlSplit> split = assigner.getNext();
-            if (split.isPresent()) {
-                sqlSplits.add(split.get());
-            } else {
-                break;
-            }
-        }
-
-        return sqlSplits.stream()
-                .map(
-                        split -> {
-                            if (split.isSnapshotSplit()) {
-                                return split.asSnapshotSplit().getTableId().table()
-                                        + " "
-                                        + Arrays.toString(split.asSnapshotSplit().getSplitStart())
-                                        + " "
-                                        + Arrays.toString(split.asSnapshotSplit().getSplitEnd());
-                            } else {
-                                return split.toString();
-                            }
-                        })
-                .collect(Collectors.toList());
-    }
-
     @Test
     public void testAssignTableWithMultipleKey() {
         List<String> expected =
@@ -200,35 +161,77 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
         assertEquals(expected, splits);
     }
 
     @Test
     public void testAssignTableWithSparseDistributionSplitKey() {
-        // test table with sparse split key order like 0,10000,20000,3000 instead of 0,1,2,3
+        // test table with sparse split key order like 0,2,4,8 instead of 0,1,2,3
+        // test sparse table with bigger distribution factor upper
         List<String> expected =
                 Arrays.asList(
-                        "customer_card null [26317]",
-                        "customer_card [26317] [32633]",
-                        "customer_card [32633] [38949]",
-                        "customer_card [38949] [45265]",
-                        "customer_card [45265] null");
+                        "customers_sparse_dist null [10]",
+                        "customers_sparse_dist [10] [18]",
+                        "customers_sparse_dist [18] null");
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
                         2000.0d,
-                        new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {
+                            customerDatabase.getDatabaseName() + ".customers_sparse_dist"
+                        });
         assertEquals(expected, splits);
 
-        // test table with sparse split key and big chunk size
-        List<String> expected1 = Arrays.asList("customer_card null null");
+        // test sparse table with smaller distribution factor upper
+        List<String> expected1 =
+                Arrays.asList(
+                        "customers_sparse_dist null [8]",
+                        "customers_sparse_dist [8] [17]",
+                        "customers_sparse_dist [17] null");
         List<String> splits1 =
                 getTestAssignSnapshotSplits(
-                        8096,
-                        10000.0d,
-                        new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
+                        4,
+                        2.0d,
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {
+                            customerDatabase.getDatabaseName() + ".customers_sparse_dist"
+                        });
+        assertEquals(expected1, splits1);
+    }
+
+    @Test
+    public void testAssignTableWithDenseDistributionSplitKey() {
+        // test dense table with smaller dense distribution factor lower
+        List<String> expected =
+                Arrays.asList(
+                        "customers_dense_dist null [2]",
+                        "customers_dense_dist [2] [3]",
+                        "customers_dense_dist [3] null");
+        List<String> splits =
+                getTestAssignSnapshotSplits(
+                        2,
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {
+                            customerDatabase.getDatabaseName() + ".customers_dense_dist"
+                        });
+        assertEquals(expected, splits);
+
+        // test dense table with bigger dense distribution factor lower
+        List<String> expected1 =
+                Arrays.asList("customers_dense_dist null [2]", "customers_dense_dist [2] null");
+        List<String> splits1 =
+                getTestAssignSnapshotSplits(
+                        2,
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        0.9d,
+                        new String[] {
+                            customerDatabase.getDatabaseName() + ".customers_dense_dist"
+                        });
         assertEquals(expected1, splits1);
     }
 
@@ -238,7 +241,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {
                             customerDatabase.getDatabaseName() + ".customer_card_single_line"
                         });
@@ -256,7 +260,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
         assertEquals(expected, splits);
     }
@@ -272,7 +277,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         4,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
         assertEquals(expected, splits);
     }
@@ -281,33 +287,29 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
     public void testAssignMinSplitSize() {
         List<String> expected =
                 Arrays.asList(
-                        "customers null [281]",
-                        "customers [281] [461]",
-                        "customers [461] [641]",
-                        "customers [641] [821]",
-                        "customers [821] [1001]",
-                        "customers [1001] [1181]",
-                        "customers [1181] [1361]",
-                        "customers [1361] [1541]",
-                        "customers [1541] [1721]",
-                        "customers [1721] [1901]",
-                        "customers [1901] null");
+                        "customers_even_dist null [103]",
+                        "customers_even_dist [103] [105]",
+                        "customers_even_dist [105] [107]",
+                        "customers_even_dist [107] [109]",
+                        "customers_even_dist [109] null");
         List<String> splits =
                 getTestAssignSnapshotSplits(
                         2,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
-                        new String[] {customerDatabase.getDatabaseName() + ".customers"});
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"});
         assertEquals(expected, splits);
     }
 
     @Test
     public void testAssignMaxSplitSize() {
-        List<String> expected = Collections.singletonList("customers null null");
+        List<String> expected = Collections.singletonList("customers_even_dist null null");
         List<String> splits =
                 getTestAssignSnapshotSplits(
-                        2000,
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
-                        new String[] {customerDatabase.getDatabaseName() + ".customers"});
+                        8096,
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"});
         assertEquals(expected, splits);
     }
 
@@ -316,7 +318,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         try {
             getTestAssignSnapshotSplits(
                     4,
-                    EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                    SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                    SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                     new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
         } catch (Throwable t) {
             assertTrue(
@@ -327,8 +330,52 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
         }
     }
 
+    private List<String> getTestAssignSnapshotSplits(
+            int splitSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            String[] captureTables) {
+        MySqlSourceConfig configuration =
+                getConfig(
+                        splitSize, distributionFactorUpper, distributionFactorLower, captureTables);
+        List<TableId> remainingTables =
+                Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList());
+        final MySqlSnapshotSplitAssigner assigner =
+                new MySqlSnapshotSplitAssigner(
+                        configuration, DEFAULT_PARALLELISM, remainingTables, false);
+
+        assigner.open();
+        List<MySqlSplit> sqlSplits = new ArrayList<>();
+        while (true) {
+            Optional<MySqlSplit> split = assigner.getNext();
+            if (split.isPresent()) {
+                sqlSplits.add(split.get());
+            } else {
+                break;
+            }
+        }
+
+        return sqlSplits.stream()
+                .map(
+                        split -> {
+                            if (split.isSnapshotSplit()) {
+                                return split.asSnapshotSplit().getTableId().table()
+                                        + " "
+                                        + Arrays.toString(split.asSnapshotSplit().getSplitStart())
+                                        + " "
+                                        + Arrays.toString(split.asSnapshotSplit().getSplitEnd());
+                            } else {
+                                return split.toString();
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
     private MySqlSourceConfig getConfig(
-            int splitSize, double evenlyDistributionFactor, String[] captureTables) {
+            int splitSize,
+            double distributionFactorUpper,
+            double distributionLower,
+            String[] captureTables) {
         return new MySqlSourceConfigFactory()
                 .startupOptions(StartupOptions.initial())
                 .databaseList(customerDatabase.getDatabaseName())
@@ -337,7 +384,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
                 .port(MYSQL_CONTAINER.getDatabasePort())
                 .splitSize(splitSize)
                 .fetchSize(2)
-                .evenlyDistributionFactor(evenlyDistributionFactor)
+                .distributionFactorUpper(distributionFactorUpper)
+                .distributionFactorLower(distributionLower)
                 .username(customerDatabase.getUsername())
                 .password(customerDatabase.getPassword())
                 .serverTimeZone(ZoneId.of("UTC").toString())
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index c33aa44c0..6834a93ae 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -48,10 +48,11 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
-import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
 import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
 import static org.junit.Assert.assertEquals;
@@ -117,7 +118,8 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_TIMEOUT.defaultValue(),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.initial());
         assertEquals(expectedSource, actualSource);
     }
@@ -129,7 +131,8 @@ public class MySqlTableSourceFactoryTest {
         properties.put("server-id", "123-126");
         properties.put("scan.incremental.snapshot.chunk.size", "8000");
         properties.put("chunk-meta.group.size", "3000");
-        properties.put("evenly-distribution.factor", "40.5");
+        properties.put("split-key.even-distribution.factor.upper-bound", "40.5");
+        properties.put("split-key.even-distribution.factor.lower-bound", "0.01");
         properties.put("scan.snapshot.fetch.size", "100");
         properties.put("connect.timeout", "45s");
 
@@ -155,6 +158,7 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
                         40.5d,
+                        0.01d,
                         StartupOptions.initial());
         assertEquals(expectedSource, actualSource);
     }
@@ -189,7 +193,8 @@ public class MySqlTableSourceFactoryTest {
                         Duration.ofSeconds(45),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.initial());
         assertEquals(expectedSource, actualSource);
     }
@@ -222,7 +227,8 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_TIMEOUT.defaultValue(),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.latest());
         assertEquals(expectedSource, actualSource);
     }
@@ -257,7 +263,8 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_TIMEOUT.defaultValue(),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.initial());
         assertEquals(expectedSource, actualSource);
     }
@@ -312,7 +319,8 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_TIMEOUT.defaultValue(),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.initial());
         assertEquals(expectedSource, actualSource);
     }
@@ -376,7 +384,8 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_TIMEOUT.defaultValue(),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.latest());
         assertEquals(expectedSource, actualSource);
     }
@@ -413,7 +422,8 @@ public class MySqlTableSourceFactoryTest {
                         CONNECT_TIMEOUT.defaultValue(),
                         CONNECT_MAX_RETRIES.defaultValue(),
                         CONNECTION_POOL_SIZE.defaultValue(),
-                        EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         StartupOptions.initial());
         expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@@ -504,7 +514,7 @@ public class MySqlTableSourceFactoryTest {
         try {
             Map<String, String> properties = getAllOptions();
             properties.put("scan.incremental.snapshot.enabled", "true");
-            properties.put("evenly-distribution.factor", "0.8");
+            properties.put("split-key.even-distribution.factor.upper-bound", "0.8");
 
             createTableSource(properties);
             fail("exception expected");
@@ -512,7 +522,7 @@ public class MySqlTableSourceFactoryTest {
             assertThat(
                     t,
                     containsMessage(
-                            "The value of option 'evenly-distribution.factor' must larger than or equals 1.0, but is 0.8"));
+                            "The value of option 'split-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8"));
         }
 
         // validate illegal connection pool size
diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
index 3234b6c01..f627d6c70 100644
--- a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
+++ b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
@@ -79,6 +79,64 @@ VALUES (101,"user_1","Shanghai","123567891234"),
        (1019,"user_20","Shanghai","123567891234"),
        (2000,"user_21","Shanghai","123567891234");
 
+-- create table whose split key is evenly distributed
+CREATE TABLE customers_even_dist (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL ,
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+INSERT INTO customers_even_dist
+VALUES (101,'user_1','Shanghai','123567891234'),
+       (102,'user_2','Shanghai','123567891234'),
+       (103,'user_3','Shanghai','123567891234'),
+       (104,'user_4','Shanghai','123567891234'),
+       (105,'user_5','Shanghai','123567891234'),
+       (106,'user_6','Shanghai','123567891234'),
+       (107,'user_7','Shanghai','123567891234'),
+       (108,'user_8','Shanghai','123567891234'),
+       (109,'user_9','Shanghai','123567891234'),
+       (110,'user_10','Shanghai','123567891234');
+
+-- create table whose split key is evenly distributed and sparse
+CREATE TABLE customers_sparse_dist (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL ,
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+INSERT INTO customers_sparse_dist
+VALUES (2,'user_1','Shanghai','123567891234'),
+       (4,'user_2','Shanghai','123567891234'),
+       (6,'user_3','Shanghai','123567891234'),
+       (8,'user_4','Shanghai','123567891234'),
+       (10,'user_5','Shanghai','123567891234'),
+       (16,'user_6','Shanghai','123567891234'),
+       (17,'user_7','Shanghai','123567891234'),
+       (18,'user_8','Shanghai','123567891234'),
+       (20,'user_9','Shanghai','123567891234'),
+       (22,'user_10','Shanghai','123567891234');
+
+-- create table whose split key is evenly distributed and dense
+CREATE TABLE customers_dense_dist (
+ id1 INTEGER NOT NULL,
+ id2 VARCHAR(255) NOT NULL ,
+ address VARCHAR(1024),
+ phone_number VARCHAR(512),
+ PRIMARY KEY(id1, id2)
+);
+INSERT INTO customers_dense_dist
+VALUES (1,'user_1','Shanghai','123567891234'),
+       (1,'user_2','Shanghai','123567891234'),
+       (1,'user_3','Shanghai','123567891234'),
+       (1,'user_4','Shanghai','123567891234'),
+       (2,'user_5','Shanghai','123567891234'),
+       (2,'user_6','Shanghai','123567891234'),
+       (2,'user_7','Shanghai','123567891234'),
+       (3,'user_8','Shanghai','123567891234'),
+       (3,'user_9','Shanghai','123567891234'),
+       (3,'user_10','Shanghai','123567891234');
+
 -- table has combined primary key
 CREATE TABLE customer_card (
   card_no BIGINT NOT NULL,
@@ -170,8 +228,8 @@ CREATE TABLE shopping_cart_dec (
 
 insert into shopping_cart_dec
 VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
-       (124456.456, 'KIND_002', 'user_1', 'my shopping cart'),
-       (125489.6789, 'KIND_003', 'user_1', 'my shopping cart');
+       (123457.456, 'KIND_002', 'user_2', 'my shopping cart'),
+       (123458.6789, 'KIND_003', 'user_3', 'my shopping cart');
 
 -- create table whose primary key are produced by snowflake algorithm
 CREATE TABLE address (