[mysql] Generates multiple chunks when approximate row count is bigger than chunk size (#1193)

This closes #1191.
pull/1369/head
Hang Ruan 3 years ago committed by GitHub
parent 0a097a9e53
commit 96c84d9c65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -186,7 +186,7 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, dynamicChunkSize);
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
@ -201,12 +201,18 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) {
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}",
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize);
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
@ -214,12 +220,12 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, chunkSize);
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;

@ -153,7 +153,7 @@ class ChunkSplitter {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, dynamicChunkSize);
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
@ -169,12 +169,18 @@ class ChunkSplitter {
*/
@VisibleForTesting
public List<ChunkRange> splitEvenlySizedChunks(
TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) {
TableId tableId,
Object min,
Object max,
long approximateRowCnt,
int chunkSize,
int dynamicChunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}",
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}",
tableId,
approximateRowCnt,
chunkSize);
chunkSize,
dynamicChunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
@ -182,12 +188,12 @@ class ChunkSplitter {
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, chunkSize);
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
} catch (ArithmeticException e) {
// Stop chunk split to avoid dead loop when number overflows.
break;

@ -35,6 +35,7 @@ public class MySqlChunkSplitterTest {
Integer.MAX_VALUE - 19,
Integer.MAX_VALUE,
20,
10,
10);
assertEquals(2, res.size());
assertEquals(ChunkRange.of(null, 2147483638), res.get(0));
@ -50,6 +51,7 @@ public class MySqlChunkSplitterTest {
Integer.MAX_VALUE - 20,
Integer.MAX_VALUE,
20,
10,
10);
assertEquals(3, res.size());
assertEquals(ChunkRange.of(null, 2147483637), res.get(0));

@ -258,6 +258,19 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
customerDatabase.getDatabaseName() + ".customers_sparse_dist"
});
assertEquals(expected1, splits1);
// test sparse table that the approximate row count is bigger than chunk size
List<String> expected2 =
Arrays.asList("customers_sparse_dist null [18]", "customers_sparse_dist [18] null");
List<String> splits2 =
getTestAssignSnapshotSplits(
8,
10d,
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customers_sparse_dist"
});
assertEquals(expected2, splits2);
}
@Test

Loading…
Cancel
Save