diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java index b07462382..1a281fbc8 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlChunkSplitter.java @@ -186,7 +186,7 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter { // the minimum dynamic chunk size is at least 1 final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, dynamicChunkSize); + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { return splitUnevenlySizedChunks( jdbc, tableId, splitColumnName, min, max, chunkSize); @@ -201,12 +201,18 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter { * and tumble chunks in step size. */ private List splitEvenlySizedChunks( - TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) { + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}", + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", tableId, approximateRowCnt, - chunkSize); + chunkSize, + dynamicChunkSize); if (approximateRowCnt <= chunkSize) { // there is no more than one chunk, return full table as a chunk return Collections.singletonList(ChunkRange.all()); @@ -214,12 +220,12 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter { final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, chunkSize); + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); while (ObjectUtils.compare(chunkEnd, max) <= 0) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); chunkStart = chunkEnd; try { - chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize); + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); } catch (ArithmeticException e) { // Stop chunk split to avoid dead loop when number overflows. break; diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index b9440dd2c..3e855105c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -153,7 +153,7 @@ class ChunkSplitter { // the minimum dynamic chunk size is at least 1 final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, dynamicChunkSize); + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { return splitUnevenlySizedChunks( jdbc, tableId, splitColumnName, min, max, chunkSize); @@ -169,12 +169,18 @@ class ChunkSplitter { */ @VisibleForTesting public List splitEvenlySizedChunks( - TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) { + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}", + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", tableId, approximateRowCnt, - chunkSize); + chunkSize, + dynamicChunkSize); if (approximateRowCnt <= chunkSize) { // there is no more than one chunk, return full table as a chunk return Collections.singletonList(ChunkRange.all()); @@ -182,12 +188,12 @@ class ChunkSplitter { final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, chunkSize); + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); while (ObjectUtils.compare(chunkEnd, max) <= 0) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); chunkStart = chunkEnd; try { - chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize); + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); } catch (ArithmeticException e) { // Stop chunk split to avoid dead loop when number overflows. break; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java index 7839fba28..f3681cbf3 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java @@ -35,6 +35,7 @@ public class MySqlChunkSplitterTest { Integer.MAX_VALUE - 19, Integer.MAX_VALUE, 20, + 10, 10); assertEquals(2, res.size()); assertEquals(ChunkRange.of(null, 2147483638), res.get(0)); @@ -50,6 +51,7 @@ public class MySqlChunkSplitterTest { Integer.MAX_VALUE - 20, Integer.MAX_VALUE, 20, + 10, 10); assertEquals(3, res.size()); assertEquals(ChunkRange.of(null, 2147483637), res.get(0)); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 0bf4520fd..ddef2a7ea 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -258,6 +258,19 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { customerDatabase.getDatabaseName() + ".customers_sparse_dist" }); assertEquals(expected1, splits1); + + // test sparse table that the approximate row count is bigger than chunk size + List expected2 = + Arrays.asList("customers_sparse_dist null [18]", "customers_sparse_dist [18] null"); + List splits2 = + getTestAssignSnapshotSplits( + 8, + 10d, + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] { + customerDatabase.getDatabaseName() + ".customers_sparse_dist" + }); + assertEquals(expected2, splits2); } @Test