[mysql] Support dynamic chunk size for evenly distribution table

pull/569/head
Leonard Xu 3 years ago committed by Leonard Xu
parent e0095458a1
commit 4b15614c45

@ -139,6 +139,12 @@ public class MySqlSourceBuilder<T> {
return this;
}
/** The factor is used to determine whether the table is evenly distribution or not. */
public MySqlSourceBuilder<T> evenlyDistributionFactor(Double evenlyDistributionFactor) {
this.configFactory.evenlyDistributionFactor(evenlyDistributionFactor);
return this;
}
/** The maximum fetch size for per poll when read table snapshot. */
public MySqlSourceBuilder<T> fetchSize(int fetchSize) {
this.configFactory.fetchSize(fetchSize);

@ -62,25 +62,19 @@ import static java.math.BigDecimal.ROUND_CEILING;
class ChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class);
/**
* The maximum evenly distribution factor used to judge the data in table is evenly distributed
* or not, the factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount.
*/
public static final Double MAX_EVENLY_DISTRIBUTION_FACTOR = 2.0d;
private final MySqlSourceConfig sourceConfig;
private final MySqlSchema mySqlSchema;
private final int chunkSize;
public ChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig, int chunkSize) {
public ChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig) {
this.mySqlSchema = mySqlSchema;
this.chunkSize = chunkSize;
this.sourceConfig = sourceConfig;
}
/** Generates all snapshot splits (chunks) for the give table path. */
public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable();
@ -137,12 +131,38 @@ class ChunkSplitter {
}
final List<ChunkRange> chunks;
if (isSplitColumnEvenlyDistributed(jdbc, tableId, splitColumn, min, max, chunkSize)) {
final int chunkSize = sourceConfig.getSplitSize();
final double evenlyDistributionFactor = sourceConfig.getEvenlyDistributionFactor();
boolean isSplitColumnEvenlyDistributed = false;
double distributionFactor = 0.0d;
if (isEvenlySplitColumn(splitColumn)) {
// optimization: table with single chunk, avoid querying from db
if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) {
isSplitColumnEvenlyDistributed = true;
} else {
distributionFactor = calculateDistributionFactor(jdbc, tableId, min, max);
isSplitColumnEvenlyDistributed = distributionFactor <= evenlyDistributionFactor;
}
}
if (isSplitColumnEvenlyDistributed) {
// use evenly-sized chunks which is much efficient
chunks = splitEvenlySizedChunks(min, max);
final int dynamicChunkSize =
getDynamicChunkSize(chunkSize, distributionFactor, evenlyDistributionFactor);
LOG.info(
"Use evenly-sized chunk optimization for table {}, the distribution factor is {}, the chunk size is {}",
tableId,
distributionFactor,
dynamicChunkSize);
chunks = splitEvenlySizedChunks(min, max, dynamicChunkSize);
} else {
// use unevenly-sized chunks which will request many queries and is not efficient.
chunks = splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max);
LOG.info(
"Use unevenly-sized chunks for table{}, the chunk size is {}",
tableId,
chunkSize);
chunks = splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
}
return chunks;
@ -150,9 +170,9 @@ class ChunkSplitter {
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in {@link #chunkSize} step size.
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(Object min, Object max) {
private List<ChunkRange> splitEvenlySizedChunks(Object min, Object max, int chunkSize) {
if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
@ -173,19 +193,24 @@ class ChunkSplitter {
/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc, TableId tableId, String splitColumnName, Object min, Object max)
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Object min,
Object max,
int chunkSize)
throws SQLException {
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++);
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@ -197,7 +222,8 @@ class ChunkSplitter {
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Object max)
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
@ -239,50 +265,31 @@ class ChunkSplitter {
// ------------------------------------------------------------------------------------------
/** Checks whether split column is evenly distributed across its range. */
private static boolean isSplitColumnEvenlyDistributed(
JdbcConnection jdbc,
TableId tableId,
Column splitColumn,
Object min,
Object max,
int chunkSize)
throws SQLException {
private static boolean isEvenlySplitColumn(Column splitColumn) {
DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
// currently, we only support the optimization that split column with type BIGINT, INT,
// DECIMAL
if (!(typeRoot == LogicalTypeRoot.BIGINT
return typeRoot == LogicalTypeRoot.BIGINT
|| typeRoot == LogicalTypeRoot.INTEGER
|| typeRoot == LogicalTypeRoot.DECIMAL)) {
return false;
}
|| typeRoot == LogicalTypeRoot.DECIMAL;
}
if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) {
return true;
/** Returns the dynamic chunk size of evenly distributed table. */
private static int getDynamicChunkSize(
int chunkSize, double distributionFactor, double evenlyDistributionFactor) {
int dynamicChunkSize = chunkSize;
if (distributionFactor <= evenlyDistributionFactor) {
dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), dynamicChunkSize);
}
// only column is numeric and evenly distribution factor is less than
// MAX_EVENLY_DISTRIBUTION_FACTOR will be treated as evenly distributed.
final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
final double evenlyDistributionFactor =
calculateEvenlyDistributionFactor(min, max, approximateRowCnt);
LOG.info(
"The evenly distribution factor for table {} is {}",
tableId,
evenlyDistributionFactor);
return evenlyDistributionFactor <= MAX_EVENLY_DISTRIBUTION_FACTOR;
return dynamicChunkSize;
}
/**
* Returns the evenly distribution factor of the table data.
*
* @param min the min value of the split column
* @param max the max value of the split column
* @param approximateRowCnt the approximate row count of the table.
*/
private static double calculateEvenlyDistributionFactor(
Object min, Object max, long approximateRowCnt) {
/** Returns the distribution factor of the given table. */
private static double calculateDistributionFactor(
JdbcConnection jdbc, TableId tableId, Object min, Object max) throws SQLException {
final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
@ -302,7 +309,7 @@ class ChunkSplitter {
return tableId.toString() + ":" + chunkId;
}
private static void maySleep(int count) {
private static void maySleep(int count, TableId tableId) {
// every 100 queries to sleep 1s
if (count % 10 == 0) {
try {
@ -310,6 +317,7 @@ class ChunkSplitter {
} catch (InterruptedException e) {
// nothing to do
}
LOG.info("ChunkSplitter has split {} chunks for table {}", count, tableId);
}
}
}

@ -88,7 +88,6 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
@Override
public void open() {
LOG.info("Open assigner");
snapshotSplitAssigner.open();
}

@ -69,7 +69,6 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private final int chunkSize;
private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;
@ -130,15 +129,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerFinished = assignerFinished;
this.remainingTables = new LinkedList<>(remainingTables);
this.chunkSize = sourceConfig.getSplitSize();
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
}
@Override
public void open() {
LOG.info("Open assigner");
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSize);
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive);
// the legacy state didn't snapshot remaining tables, discovery remaining table here
if (!isRemainingTablesCheckpointed && !assignerFinished) {
@ -303,8 +300,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
}
private static ChunkSplitter createChunkSplitter(
MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive, int chunkSize) {
MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) {
MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseSensitive);
return new ChunkSplitter(mySqlSchema, sourceConfig, chunkSize);
return new ChunkSplitter(mySqlSchema, sourceConfig);
}
}

@ -52,6 +52,7 @@ public class MySqlSourceConfig implements Serializable {
private final Duration connectTimeout;
private final int connectMaxRetries;
private final int connectionPoolSize;
private final double evenlyDistributionFactor;
private final boolean includeSchemaChanges;
// --------------------------------------------------------------------------------------------
@ -77,6 +78,7 @@ public class MySqlSourceConfig implements Serializable {
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
double evenlyDistributionFactor,
boolean includeSchemaChanges,
Properties dbzProperties) {
this.hostname = checkNotNull(hostname);
@ -94,6 +96,7 @@ public class MySqlSourceConfig implements Serializable {
this.connectTimeout = checkNotNull(connectTimeout);
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
this.evenlyDistributionFactor = evenlyDistributionFactor;
this.includeSchemaChanges = includeSchemaChanges;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
@ -141,6 +144,10 @@ public class MySqlSourceConfig implements Serializable {
return splitMetaGroupSize;
}
public double getEvenlyDistributionFactor() {
return evenlyDistributionFactor;
}
public int getFetchSize() {
return fetchSize;
}

@ -32,6 +32,10 @@ import java.util.Properties;
import java.util.UUID;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
@ -55,9 +59,10 @@ public class MySqlSourceConfigFactory implements Serializable {
private int splitSize = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue();
private int splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
private int fetchSize = SCAN_SNAPSHOT_FETCH_SIZE.defaultValue();
private Duration connectTimeout = MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue();
private int connectMaxRetries = MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
private int connectionPoolSize = MySqlSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
private Duration connectTimeout = CONNECT_TIMEOUT.defaultValue();
private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue();
private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue();
private double evenlyDistributionFactor = EVENLY_DISTRIBUTION_FACTOR.defaultValue();
private boolean includeSchemaChanges = false;
private Properties dbzProperties;
@ -147,6 +152,12 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/** The factor is used to determine whether the table is evenly distribution or not. */
public MySqlSourceConfigFactory evenlyDistributionFactor(Double evenlyDistributionFactor) {
this.evenlyDistributionFactor = evenlyDistributionFactor;
return this;
}
/** The maximum fetch size for per poll when read table snapshot. */
public MySqlSourceConfigFactory fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
@ -273,6 +284,7 @@ public class MySqlSourceConfigFactory implements Serializable {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
includeSchemaChanges,
props);
}

@ -18,6 +18,7 @@
package com.ververica.cdc.connectors.mysql.source.config;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@ -105,13 +106,6 @@ public class MySqlSourceOptions {
.defaultValue(8096)
.withDescription(
"The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.");
// internal option, won't add to documentaion
public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
ConfigOptions.key("chunk-meta.group.size")
.intType()
.defaultValue(1000)
.withDescription(
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be will be divided into multiple groups.");
public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
ConfigOptions.key("scan.snapshot.fetch.size")
@ -169,4 +163,26 @@ public class MySqlSourceOptions {
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");
// ----------------------------------------------------------------------------
// experimental options, won't add them to documentation
// ----------------------------------------------------------------------------
@Experimental
public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
ConfigOptions.key("chunk-meta.group.size")
.intType()
.defaultValue(1000)
.withDescription(
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be will be divided into multiple groups.");
@Experimental
public static final ConfigOption<Double> EVENLY_DISTRIBUTION_FACTOR =
ConfigOptions.key("evenly-distribution.factor")
.doubleType()
.defaultValue(1000.0d)
.withDescription(
"The factor is used to determine whether the table is evenly distribution or not."
+ " the table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the sql query would be used when it is uneven."
+ " The distribution factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount.");
}

@ -75,6 +75,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final Duration connectTimeout;
private final int connectionPoolSize;
private final int connectMaxRetries;
private final double evenlyDistributionFactor;
private final StartupOptions startupOptions;
// --------------------------------------------------------------------------------------------
@ -105,6 +106,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
double evenlyDistributionFactor,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
@ -123,6 +125,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.connectTimeout = connectTimeout;
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
this.evenlyDistributionFactor = evenlyDistributionFactor;
this.startupOptions = startupOptions;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
@ -169,6 +172,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.serverId(serverId)
.splitSize(splitSize)
.splitMetaGroupSize(splitMetaGroupSize)
.evenlyDistributionFactor(evenlyDistributionFactor)
.fetchSize(fetchSize)
.connectTimeout(connectTimeout)
.connectMaxRetries(connectMaxRetries)
@ -249,6 +253,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
startupOptions);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
@ -269,6 +274,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
&& splitSize == that.splitSize
&& splitMetaGroupSize == that.splitMetaGroupSize
&& fetchSize == that.fetchSize
&& evenlyDistributionFactor == that.evenlyDistributionFactor
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
@ -306,6 +312,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
startupOptions,
producedDataType,
metadataKeys);

@ -43,6 +43,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT;
@ -92,6 +93,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
double evenlyDistributionFactor = config.get(EVENLY_DISTRIBUTION_FACTOR);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
@ -102,6 +104,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateEvenlyDistributionFactor(evenlyDistributionFactor);
}
return new MySqlTableSource(
@ -122,6 +125,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
startupOptions);
}
@ -157,6 +161,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
options.add(CONNECT_TIMEOUT);
options.add(CONNECTION_POOL_SIZE);
options.add(EVENLY_DISTRIBUTION_FACTOR);
options.add(CONNECT_MAX_RETRIES);
return options;
}
@ -262,4 +267,13 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
e);
}
}
/** Checks the value of given evenly distribution factor is valid. */
private void validateEvenlyDistributionFactor(double evenlyDistributionFactor) {
checkState(
evenlyDistributionFactor >= 1.0d,
String.format(
"The value of option '%s' must larger than or equals %s, but is %s",
EVENLY_DISTRIBUTION_FACTOR.key(), 1.0d, evenlyDistributionFactor));
}
}

@ -38,6 +38,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -56,16 +57,17 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignSingleTableSplits() {
List<String> expected =
Arrays.asList(
"customers null [109]",
"customers [109] [118]",
"customers [118] [1009]",
"customers [1009] [1012]",
"customers [1012] [1015]",
"customers [1015] [1018]",
"customers [1018] null");
"customers null [462]",
"customers [462] [823]",
"customers [823] [1184]",
"customers [1184] [1545]",
"customers [1545] [1906]",
"customers [1906] null");
List<String> splits =
getTestAssignSnapshotSplits(
4, new String[] {customerDatabase.getDatabaseName() + ".customers"});
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
assertEquals(expected, splits);
}
@ -74,7 +76,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> expected = Arrays.asList("customers null null");
List<String> splits =
getTestAssignSnapshotSplits(
2000, new String[] {customerDatabase.getDatabaseName() + ".customers"});
2000,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
assertEquals(expected, splits);
}
@ -82,23 +86,22 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignMultipleTableSplits() {
List<String> expected =
Arrays.asList(
"customers null [109]",
"customers [109] [118]",
"customers [118] [1009]",
"customers [1009] [1012]",
"customers [1012] [1015]",
"customers [1015] [1018]",
"customers [1018] null",
"customers_1 null [109]",
"customers_1 [109] [118]",
"customers_1 [118] [1009]",
"customers_1 [1009] [1012]",
"customers_1 [1012] [1015]",
"customers_1 [1015] [1018]",
"customers_1 [1018] null");
"customers null [462]",
"customers [462] [823]",
"customers [823] [1184]",
"customers [1184] [1545]",
"customers [1545] [1906]",
"customers [1906] null",
"customers_1 null [462]",
"customers_1 [462] [823]",
"customers_1 [823] [1184]",
"customers_1 [1184] [1545]",
"customers_1 [1545] [1906]",
"customers_1 [1906] null");
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customers",
customerDatabase.getDatabaseName() + ".customers_1"
@ -113,6 +116,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
2,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_big"});
assertEquals(expected, splits);
}
@ -126,7 +130,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
"address [417420106184475563] null");
List<String> splits =
getTestAssignSnapshotSplits(
4, new String[] {customerDatabase.getDatabaseName() + ".address"});
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".address"});
assertEquals(expected, splits);
}
@ -134,17 +140,20 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignSnapshotSplitsWithDecimalKey() {
List<String> expected =
Arrays.asList(
"shopping_cart_dec null [124456.4560]",
"shopping_cart_dec [124456.4560] null");
"shopping_cart_dec null [124812.1230]",
"shopping_cart_dec [124812.1230] null");
List<String> splits =
getTestAssignSnapshotSplits(
2,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_dec"});
assertEquals(expected, splits);
}
private List<String> getTestAssignSnapshotSplits(int splitSize, String[] captureTables) {
MySqlSourceConfig configuration = getConfig(splitSize, captureTables);
private List<String> getTestAssignSnapshotSplits(
int splitSize, double evenlyDistributionFactor, String[] captureTables) {
MySqlSourceConfig configuration =
getConfig(splitSize, evenlyDistributionFactor, captureTables);
List<TableId> remainingTables =
Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList());
final MySqlSnapshotSplitAssigner assigner =
@ -190,16 +199,46 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
"customer_card [50001] null");
List<String> splits =
getTestAssignSnapshotSplits(
4, new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
assertEquals(expected, splits);
}
@Test
public void testAssignTableWithSparseDistributionSplitKey() {
// test table with sparse split key order like 0,10000,20000,3000 instead of 0,1,2,3
List<String> expected =
Arrays.asList(
"customer_card null [26317]",
"customer_card [26317] [32633]",
"customer_card [32633] [38949]",
"customer_card [38949] [45265]",
"customer_card [45265] null");
List<String> splits =
getTestAssignSnapshotSplits(
4,
2000.0d,
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
assertEquals(expected, splits);
// test table with sparse split key and big chunk size
List<String> expected1 = Arrays.asList("customer_card null null");
List<String> splits1 =
getTestAssignSnapshotSplits(
8096,
10000.0d,
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
assertEquals(expected1, splits1);
}
@Test
public void testAssignTableWithSingleLine() {
List<String> expected = Collections.singletonList("customer_card_single_line null null");
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customer_card_single_line"
});
@ -216,7 +255,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
"shopping_cart [user_5] null");
List<String> splits =
getTestAssignSnapshotSplits(
4, new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
assertEquals(expected, splits);
}
@ -230,7 +271,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
"shopping_cart [user_5] null");
List<String> splits =
getTestAssignSnapshotSplits(
4, new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
assertEquals(expected, splits);
}
@ -238,29 +281,22 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignMinSplitSize() {
List<String> expected =
Arrays.asList(
"customers null [102]",
"customers [102] [103]",
"customers [103] [109]",
"customers [109] [110]",
"customers [110] [111]",
"customers [111] [118]",
"customers [118] [121]",
"customers [121] [123]",
"customers [123] [1009]",
"customers [1009] [1010]",
"customers [1010] [1011]",
"customers [1011] [1012]",
"customers [1012] [1013]",
"customers [1013] [1014]",
"customers [1014] [1015]",
"customers [1015] [1016]",
"customers [1016] [1017]",
"customers [1017] [1018]",
"customers [1018] [1019]",
"customers [1019] null");
"customers null [281]",
"customers [281] [461]",
"customers [461] [641]",
"customers [641] [821]",
"customers [821] [1001]",
"customers [1001] [1181]",
"customers [1181] [1361]",
"customers [1361] [1541]",
"customers [1541] [1721]",
"customers [1721] [1901]",
"customers [1901] null");
List<String> splits =
getTestAssignSnapshotSplits(
2, new String[] {customerDatabase.getDatabaseName() + ".customers"});
2,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
assertEquals(expected, splits);
}
@ -269,7 +305,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> expected = Collections.singletonList("customers null null");
List<String> splits =
getTestAssignSnapshotSplits(
2000, new String[] {customerDatabase.getDatabaseName() + ".customers"});
2000,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
assertEquals(expected, splits);
}
@ -277,7 +315,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testUnMatchedPrimaryKey() {
try {
getTestAssignSnapshotSplits(
4, new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
@ -287,7 +327,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
}
}
private MySqlSourceConfig getConfig(int splitSize, String[] captureTables) {
private MySqlSourceConfig getConfig(
int splitSize, double evenlyDistributionFactor, String[] captureTables) {
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList(customerDatabase.getDatabaseName())
@ -296,6 +337,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
.port(MYSQL_CONTAINER.getDatabasePort())
.splitSize(splitSize)
.fetchSize(2)
.evenlyDistributionFactor(evenlyDistributionFactor)
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())

@ -48,6 +48,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
@ -116,6 +117,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -127,6 +129,7 @@ public class MySqlTableSourceFactoryTest {
properties.put("server-id", "123-126");
properties.put("scan.incremental.snapshot.chunk.size", "8000");
properties.put("chunk-meta.group.size", "3000");
properties.put("evenly-distribution.factor", "40.5");
properties.put("scan.snapshot.fetch.size", "100");
properties.put("connect.timeout", "45s");
@ -151,6 +154,7 @@ public class MySqlTableSourceFactoryTest {
Duration.ofSeconds(45),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
40.5d,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -185,6 +189,7 @@ public class MySqlTableSourceFactoryTest {
Duration.ofSeconds(45),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -217,6 +222,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@ -251,6 +257,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -305,6 +312,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -368,6 +376,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@ -404,6 +413,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
StartupOptions.initial());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@ -490,6 +500,21 @@ public class MySqlTableSourceFactoryTest {
"The value of option 'chunk-meta.group.size' must larger than 1, but is 1"));
}
// validate illegal split meta group size
try {
Map<String, String> properties = getAllOptions();
properties.put("scan.incremental.snapshot.enabled", "true");
properties.put("evenly-distribution.factor", "0.8");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertThat(
t,
containsMessage(
"The value of option 'evenly-distribution.factor' must larger than or equals 1.0, but is 0.8"));
}
// validate illegal connection pool size
try {
Map<String, String> properties = getAllOptions();

Loading…
Cancel
Save