[mysql] Optimize the dynamic chunk size calculation for table with dense distribution (#599)

pull/555/head
Leonard Xu 3 years ago committed by GitHub
parent 823c14cde4
commit 7ae20a1277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -139,9 +139,21 @@ public class MySqlSourceBuilder<T> {
return this;
}
/** The factor is used to determine whether the table is evenly distribution or not. */
public MySqlSourceBuilder<T> evenlyDistributionFactor(Double evenlyDistributionFactor) {
this.configFactory.evenlyDistributionFactor(evenlyDistributionFactor);
/**
* The upper bound of split key evenly distribution factor, the factor is used to determine
* whether the table is evenly distribution or not.
*/
public MySqlSourceBuilder<T> distributionFactorUpper(double distributionFactorUpper) {
this.configFactory.distributionFactorUpper(distributionFactorUpper);
return this;
}
/**
* The lower bound of split key evenly distribution factor, the factor is used to determine
* whether the table is evenly distribution or not.
*/
public MySqlSourceBuilder<T> distributionFactorLower(double distributionFactorLower) {
this.configFactory.distributionFactorLower(distributionFactorLower);
return this;
}

@ -49,6 +49,7 @@ import java.util.Map;
import java.util.Objects;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryApproximateRowCnt;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMin;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMinMax;
@ -60,6 +61,7 @@ import static java.math.BigDecimal.ROUND_CEILING;
* {@link MySqlSnapshotSplit}).
*/
class ChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class);
private final MySqlSourceConfig sourceConfig;
@ -119,6 +121,11 @@ class ChunkSplitter {
// Utilities
// --------------------------------------------------------------------------------------------
/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
* many queries and is not efficient.
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
@ -130,50 +137,45 @@ class ChunkSplitter {
return Collections.singletonList(ChunkRange.all());
}
final List<ChunkRange> chunks;
final int chunkSize = sourceConfig.getSplitSize();
final double evenlyDistributionFactor = sourceConfig.getEvenlyDistributionFactor();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
boolean isSplitColumnEvenlyDistributed = false;
double distributionFactor = 0.0d;
if (isEvenlySplitColumn(splitColumn)) {
// optimization: table with single chunk, avoid querying from db
if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) {
isSplitColumnEvenlyDistributed = true;
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor =
calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0
&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
// the minimum dynamic chunk size is at least 1
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, dynamicChunkSize);
} else {
distributionFactor = calculateDistributionFactor(jdbc, tableId, min, max);
isSplitColumnEvenlyDistributed = distributionFactor <= evenlyDistributionFactor;
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
}
}
if (isSplitColumnEvenlyDistributed) {
// use evenly-sized chunks which is much efficient
final int dynamicChunkSize =
getDynamicChunkSize(chunkSize, distributionFactor, evenlyDistributionFactor);
LOG.info(
"Use evenly-sized chunk optimization for table {}, the distribution factor is {}, the chunk size is {}",
tableId,
distributionFactor,
dynamicChunkSize);
chunks = splitEvenlySizedChunks(min, max, dynamicChunkSize);
} else {
// use unevenly-sized chunks which will request many queries and is not efficient.
LOG.info(
"Use unevenly-sized chunks for table{}, the chunk size is {}",
tableId,
chunkSize);
chunks = splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
}
return chunks;
}
/**
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in step size.
*/
private List<ChunkRange> splitEvenlySizedChunks(Object min, Object max, int chunkSize) {
if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) {
private List<ChunkRange> splitEvenlySizedChunks(
TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize) {
LOG.info(
"Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}",
tableId,
approximateRowCnt,
chunkSize);
if (approximateRowCnt <= chunkSize) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}
@ -200,6 +202,8 @@ class ChunkSplitter {
Object max,
int chunkSize)
throws SQLException {
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
@ -276,20 +280,10 @@ class ChunkSplitter {
|| typeRoot == LogicalTypeRoot.DECIMAL;
}
/** Returns the dynamic chunk size of evenly distributed table. */
private static int getDynamicChunkSize(
int chunkSize, double distributionFactor, double evenlyDistributionFactor) {
int dynamicChunkSize = chunkSize;
if (distributionFactor <= evenlyDistributionFactor) {
dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), dynamicChunkSize);
}
return dynamicChunkSize;
}
/** Returns the distribution factor of the given table. */
private static double calculateDistributionFactor(
JdbcConnection jdbc, TableId tableId, Object min, Object max) throws SQLException {
final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
private double calculateDistributionFactor(
TableId tableId, Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
@ -300,9 +294,18 @@ class ChunkSplitter {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = max - min + 1 / rowCount
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
return subRowCnt.divide(new BigDecimal(approximateRowCnt), 2, ROUND_CEILING).doubleValue();
double distributionFactor =
subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
distributionFactor,
min,
max,
approximateRowCnt);
return distributionFactor;
}
private static String splitId(TableId tableId, int chunkId) {

@ -52,7 +52,8 @@ public class MySqlSourceConfig implements Serializable {
private final Duration connectTimeout;
private final int connectMaxRetries;
private final int connectionPoolSize;
private final double evenlyDistributionFactor;
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
// --------------------------------------------------------------------------------------------
@ -78,7 +79,8 @@ public class MySqlSourceConfig implements Serializable {
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
double evenlyDistributionFactor,
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
Properties dbzProperties) {
this.hostname = checkNotNull(hostname);
@ -96,7 +98,8 @@ public class MySqlSourceConfig implements Serializable {
this.connectTimeout = checkNotNull(connectTimeout);
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
this.evenlyDistributionFactor = evenlyDistributionFactor;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
@ -144,8 +147,12 @@ public class MySqlSourceConfig implements Serializable {
return splitMetaGroupSize;
}
public double getEvenlyDistributionFactor() {
return evenlyDistributionFactor;
public double getDistributionFactorUpper() {
return distributionFactorUpper;
}
public double getDistributionFactorLower() {
return distributionFactorLower;
}
public int getFetchSize() {

@ -35,10 +35,11 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A factory to construct {@link MySqlSourceConfig}. */
@ -62,7 +63,10 @@ public class MySqlSourceConfigFactory implements Serializable {
private Duration connectTimeout = CONNECT_TIMEOUT.defaultValue();
private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue();
private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue();
private double evenlyDistributionFactor = EVENLY_DISTRIBUTION_FACTOR.defaultValue();
private double distributionFactorUpper =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
private double distributionFactorLower =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private Properties dbzProperties;
@ -152,9 +156,21 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/** The factor is used to determine whether the table is evenly distribution or not. */
public MySqlSourceConfigFactory evenlyDistributionFactor(Double evenlyDistributionFactor) {
this.evenlyDistributionFactor = evenlyDistributionFactor;
/**
* The upper bound of split key evenly distribution factor, the factor is used to determine
* whether the table is evenly distribution or not.
*/
public MySqlSourceConfigFactory distributionFactorUpper(double distributionFactorUpper) {
this.distributionFactorUpper = distributionFactorUpper;
return this;
}
/**
* The lower bound of split key evenly distribution factor, the factor is used to determine
* whether the table is evenly distribution or not.
*/
public MySqlSourceConfigFactory distributionFactorLower(double distributionFactorLower) {
this.distributionFactorLower = distributionFactorLower;
return this;
}
@ -284,7 +300,8 @@ public class MySqlSourceConfigFactory implements Serializable {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
props);
}

@ -176,13 +176,26 @@ public class MySqlSourceOptions {
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be will be divided into multiple groups.");
@Experimental
public static final ConfigOption<Double> EVENLY_DISTRIBUTION_FACTOR =
ConfigOptions.key("evenly-distribution.factor")
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.withDescription(
"The factor is used to determine whether the table is evenly distribution or not."
+ " the table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the sql query would be used when it is uneven."
+ " The distribution factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount.");
"The upper bound of split key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
@Experimental
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
.doubleType()
.defaultValue(0.05d)
.withDescription(
"The lower bound of split key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
}

@ -87,4 +87,16 @@ public class ObjectUtils {
Comparable<Object> c2 = (Comparable<Object>) obj2;
return c1.compareTo(c2);
}
/**
* Compares two Double numeric object.
*
* @return -1, 0, or 1 as this {@code arg1} is numerically less than, equal to, or greater than
* {@code arg2}.
*/
public static int doubleCompare(double arg1, double arg2) {
BigDecimal bigDecimal1 = BigDecimal.valueOf(arg1);
BigDecimal bigDecimal2 = BigDecimal.valueOf(arg2);
return bigDecimal1.compareTo(bigDecimal2);
}
}

@ -75,7 +75,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final Duration connectTimeout;
private final int connectionPoolSize;
private final int connectMaxRetries;
private final double evenlyDistributionFactor;
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final StartupOptions startupOptions;
// --------------------------------------------------------------------------------------------
@ -106,7 +107,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
double evenlyDistributionFactor,
double distributionFactorUpper,
double distributionFactorLower,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
@ -125,7 +127,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.connectTimeout = connectTimeout;
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
this.evenlyDistributionFactor = evenlyDistributionFactor;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
@ -172,7 +175,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.serverId(serverId)
.splitSize(splitSize)
.splitMetaGroupSize(splitMetaGroupSize)
.evenlyDistributionFactor(evenlyDistributionFactor)
.distributionFactorUpper(distributionFactorUpper)
.distributionFactorLower(distributionFactorLower)
.fetchSize(fetchSize)
.connectTimeout(connectTimeout)
.connectMaxRetries(connectMaxRetries)
@ -253,7 +257,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
distributionFactorUpper,
distributionFactorLower,
startupOptions);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
@ -274,7 +279,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
&& splitSize == that.splitSize
&& splitMetaGroupSize == that.splitMetaGroupSize
&& fetchSize == that.fetchSize
&& evenlyDistributionFactor == that.evenlyDistributionFactor
&& distributionFactorUpper == that.distributionFactorUpper
&& distributionFactorLower == that.distributionFactorLower
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
@ -312,7 +318,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
distributionFactorUpper,
distributionFactorLower,
startupOptions,
producedDataType,
metadataKeys);

@ -43,7 +43,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT;
@ -56,8 +55,11 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_ID;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.flink.util.Preconditions.checkState;
@ -93,7 +95,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
double evenlyDistributionFactor = config.get(EVENLY_DISTRIBUTION_FACTOR);
double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
@ -104,7 +107,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateEvenlyDistributionFactor(evenlyDistributionFactor);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
}
return new MySqlTableSource(
@ -125,7 +129,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
connectTimeout,
connectMaxRetries,
connectionPoolSize,
evenlyDistributionFactor,
distributionFactorUpper,
distributionFactorLower,
startupOptions);
}
@ -161,7 +166,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
options.add(CONNECT_TIMEOUT);
options.add(CONNECTION_POOL_SIZE);
options.add(EVENLY_DISTRIBUTION_FACTOR);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
return options;
}
@ -268,12 +274,27 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
}
}
/** Checks the value of given evenly distribution factor is valid. */
private void validateEvenlyDistributionFactor(double evenlyDistributionFactor) {
/** Checks the value of given evenly distribution factor upper bound is valid. */
private void validateDistributionFactorUpper(double distributionFactorUpper) {
checkState(
evenlyDistributionFactor >= 1.0d,
doubleCompare(distributionFactorUpper, 1.0d) >= 0,
String.format(
"The value of option '%s' must larger than or equals %s, but is %s",
EVENLY_DISTRIBUTION_FACTOR.key(), 1.0d, evenlyDistributionFactor));
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
1.0d,
distributionFactorUpper));
}
/** Checks the value of given evenly distribution factor lower bound is valid. */
private void validateDistributionFactorLower(double distributionFactorLower) {
checkState(
doubleCompare(distributionFactorLower, 0.0d) >= 0
&& doubleCompare(distributionFactorLower, 1.0d) <= 0,
String.format(
"The value of option '%s' must between %s and %s inclusively, but is %s",
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
0.0d,
1.0d,
distributionFactorLower));
}
}

@ -74,7 +74,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
@Test
public void testReadSingleBinlogSplit() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
final DataType dataType =
@ -83,7 +83,8 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSnapshotSplit> splits = getMySqlSplits(new String[] {"customers"}, sourceConfig);
List<MySqlSnapshotSplit> splits =
getMySqlSplits(new String[] {"customers_even_dist"}, sourceConfig);
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
@ -94,13 +95,8 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
"+U[103, user_3, Hangzhou, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]"
"+I[104, user_4, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]"
};
List<String> actual =
@ -117,7 +113,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
@Test
public void testReadAllBinlogSplitsForOneTable() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
final DataType dataType =
@ -126,39 +122,27 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSnapshotSplit> splits = getMySqlSplits(new String[] {"customers"}, sourceConfig);
List<MySqlSnapshotSplit> splits =
getMySqlSplits(new String[] {"customers_even_dist"}, sourceConfig);
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Shanghai, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Shanghai, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, Hangzhou, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]"
@ -603,7 +587,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.splitSize(10)
.splitSize(4)
.fetchSize(2)
.password(customerDatabase.getPassword())
.createConfig(0);

@ -56,14 +56,14 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@BeforeClass
public static void init() {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}, 10);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
}
@Test
public void testReadSingleSnapshotSplit() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
@ -77,12 +77,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]"
"+I[104, user_4, Shanghai, 123567891234]"
};
List<String> actual = readTableSnapshotSplits(mySqlSplits, sourceConfig, 1, dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
@ -90,7 +85,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@Test
public void testReadAllSnapshotSplitsForOneTable() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
final DataType dataType =
DataTypes.ROW(
@ -105,24 +100,13 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Shanghai, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Shanghai, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]"
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
@ -131,7 +115,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@Test
public void testReadAllSplitForTableWithSingleLine() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"});
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"}, 10);
final DataType dataType =
DataTypes.ROW(
@ -149,7 +133,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@Test
public void testReadAllSnapshotSplitsForTables() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customer_card_single_line"});
getConfig(new String[] {"customer_card", "customer_card_single_line"}, 10);
DataType dataType =
DataTypes.ROW(
@ -249,7 +233,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
return mySqlSplitList;
}
public static MySqlSourceConfig getConfig(String[] captureTables) {
public static MySqlSourceConfig getConfig(String[] captureTables, int splitSize) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
@ -262,7 +246,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.splitSize(10)
.splitSize(splitSize)
.fetchSize(2)
.password(customerDatabase.getPassword())
.createConfig(0);

@ -32,6 +32,7 @@ import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.sql.SQLException;
@ -44,6 +45,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
/** IT tests for {@link MySqlSource}. */
public class MySqlSourceITCase extends MySqlSourceTestBase {
@ -254,11 +257,12 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
private String getTableName(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
return "customers";
return captureCustomerTables[0];
} else {
// pattern that matches test table: customers and customers_1
return "customers.*";
// pattern that matches multiple tables
return String.format("(%s)", StringUtils.join(captureCustomerTables, "|"));
}
}

@ -38,7 +38,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -57,17 +58,15 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignSingleTableSplits() {
List<String> expected =
Arrays.asList(
"customers null [462]",
"customers [462] [823]",
"customers [823] [1184]",
"customers [1184] [1545]",
"customers [1545] [1906]",
"customers [1906] null");
"customers_even_dist null [105]",
"customers_even_dist [105] [109]",
"customers_even_dist [109] null");
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"});
assertEquals(expected, splits);
}
@ -77,7 +76,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
2000,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
assertEquals(expected, splits);
}
@ -86,25 +86,20 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignMultipleTableSplits() {
List<String> expected =
Arrays.asList(
"customers null [462]",
"customers [462] [823]",
"customers [823] [1184]",
"customers [1184] [1545]",
"customers [1545] [1906]",
"customers [1906] null",
"customers_1 null [462]",
"customers_1 [462] [823]",
"customers_1 [823] [1184]",
"customers_1 [1184] [1545]",
"customers_1 [1545] [1906]",
"customers_1 [1906] null");
"customers_even_dist null [105]",
"customers_even_dist [105] [109]",
"customers_even_dist [109] null",
"customers_sparse_dist null [10]",
"customers_sparse_dist [10] [18]",
"customers_sparse_dist [18] null");
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customers",
customerDatabase.getDatabaseName() + ".customers_1"
customerDatabase.getDatabaseName() + ".customers_even_dist",
customerDatabase.getDatabaseName() + ".customers_sparse_dist"
});
assertEquals(expected, splits);
}
@ -116,7 +111,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
2,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_big"});
assertEquals(expected, splits);
}
@ -131,7 +127,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".address"});
assertEquals(expected, splits);
}
@ -140,53 +137,17 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignSnapshotSplitsWithDecimalKey() {
List<String> expected =
Arrays.asList(
"shopping_cart_dec null [124812.1230]",
"shopping_cart_dec [124812.1230] null");
"shopping_cart_dec null [123458.1230]",
"shopping_cart_dec [123458.1230] null");
List<String> splits =
getTestAssignSnapshotSplits(
2,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_dec"});
assertEquals(expected, splits);
}
private List<String> getTestAssignSnapshotSplits(
int splitSize, double evenlyDistributionFactor, String[] captureTables) {
MySqlSourceConfig configuration =
getConfig(splitSize, evenlyDistributionFactor, captureTables);
List<TableId> remainingTables =
Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList());
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, remainingTables, false);
assigner.open();
List<MySqlSplit> sqlSplits = new ArrayList<>();
while (true) {
Optional<MySqlSplit> split = assigner.getNext();
if (split.isPresent()) {
sqlSplits.add(split.get());
} else {
break;
}
}
return sqlSplits.stream()
.map(
split -> {
if (split.isSnapshotSplit()) {
return split.asSnapshotSplit().getTableId().table()
+ " "
+ Arrays.toString(split.asSnapshotSplit().getSplitStart())
+ " "
+ Arrays.toString(split.asSnapshotSplit().getSplitEnd());
} else {
return split.toString();
}
})
.collect(Collectors.toList());
}
@Test
public void testAssignTableWithMultipleKey() {
List<String> expected =
@ -200,35 +161,77 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
assertEquals(expected, splits);
}
@Test
public void testAssignTableWithSparseDistributionSplitKey() {
// test table with sparse split key order like 0,10000,20000,3000 instead of 0,1,2,3
// test table with sparse split key order like 0,2,4,8 instead of 0,1,2,3
// test sparse table with bigger distribution factor upper
List<String> expected =
Arrays.asList(
"customer_card null [26317]",
"customer_card [26317] [32633]",
"customer_card [32633] [38949]",
"customer_card [38949] [45265]",
"customer_card [45265] null");
"customers_sparse_dist null [10]",
"customers_sparse_dist [10] [18]",
"customers_sparse_dist [18] null");
List<String> splits =
getTestAssignSnapshotSplits(
4,
2000.0d,
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customers_sparse_dist"
});
assertEquals(expected, splits);
// test table with sparse split key and big chunk size
List<String> expected1 = Arrays.asList("customer_card null null");
// test sparse table with smaller distribution factor upper
List<String> expected1 =
Arrays.asList(
"customers_sparse_dist null [8]",
"customers_sparse_dist [8] [17]",
"customers_sparse_dist [17] null");
List<String> splits1 =
getTestAssignSnapshotSplits(
8096,
10000.0d,
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
4,
2.0d,
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customers_sparse_dist"
});
assertEquals(expected1, splits1);
}
@Test
public void testAssignTableWithDenseDistributionSplitKey() {
// test dense table with smaller dense distribution factor lower
List<String> expected =
Arrays.asList(
"customers_dense_dist null [2]",
"customers_dense_dist [2] [3]",
"customers_dense_dist [3] null");
List<String> splits =
getTestAssignSnapshotSplits(
2,
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customers_dense_dist"
});
assertEquals(expected, splits);
// test dense table with bigger dense distribution factor lower
List<String> expected1 =
Arrays.asList("customers_dense_dist null [2]", "customers_dense_dist [2] null");
List<String> splits1 =
getTestAssignSnapshotSplits(
2,
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
0.9d,
new String[] {
customerDatabase.getDatabaseName() + ".customers_dense_dist"
});
assertEquals(expected1, splits1);
}
@ -238,7 +241,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {
customerDatabase.getDatabaseName() + ".customer_card_single_line"
});
@ -256,7 +260,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
assertEquals(expected, splits);
}
@ -272,7 +277,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
List<String> splits =
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"});
assertEquals(expected, splits);
}
@ -281,33 +287,29 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignMinSplitSize() {
List<String> expected =
Arrays.asList(
"customers null [281]",
"customers [281] [461]",
"customers [461] [641]",
"customers [641] [821]",
"customers [821] [1001]",
"customers [1001] [1181]",
"customers [1181] [1361]",
"customers [1361] [1541]",
"customers [1541] [1721]",
"customers [1721] [1901]",
"customers [1901] null");
"customers_even_dist null [103]",
"customers_even_dist [103] [105]",
"customers_even_dist [105] [107]",
"customers_even_dist [107] [109]",
"customers_even_dist [109] null");
List<String> splits =
getTestAssignSnapshotSplits(
2,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"});
assertEquals(expected, splits);
}
@Test
public void testAssignMaxSplitSize() {
List<String> expected = Collections.singletonList("customers null null");
List<String> expected = Collections.singletonList("customers_even_dist null null");
List<String> splits =
getTestAssignSnapshotSplits(
2000,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers"});
8096,
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"});
assertEquals(expected, splits);
}
@ -316,7 +318,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
try {
getTestAssignSnapshotSplits(
4,
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {customerDatabase.getDatabaseName() + ".customer_card"});
} catch (Throwable t) {
assertTrue(
@ -327,8 +330,52 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
}
}
private List<String> getTestAssignSnapshotSplits(
int splitSize,
double distributionFactorUpper,
double distributionFactorLower,
String[] captureTables) {
MySqlSourceConfig configuration =
getConfig(
splitSize, distributionFactorUpper, distributionFactorLower, captureTables);
List<TableId> remainingTables =
Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList());
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, remainingTables, false);
assigner.open();
List<MySqlSplit> sqlSplits = new ArrayList<>();
while (true) {
Optional<MySqlSplit> split = assigner.getNext();
if (split.isPresent()) {
sqlSplits.add(split.get());
} else {
break;
}
}
return sqlSplits.stream()
.map(
split -> {
if (split.isSnapshotSplit()) {
return split.asSnapshotSplit().getTableId().table()
+ " "
+ Arrays.toString(split.asSnapshotSplit().getSplitStart())
+ " "
+ Arrays.toString(split.asSnapshotSplit().getSplitEnd());
} else {
return split.toString();
}
})
.collect(Collectors.toList());
}
private MySqlSourceConfig getConfig(
int splitSize, double evenlyDistributionFactor, String[] captureTables) {
int splitSize,
double distributionFactorUpper,
double distributionLower,
String[] captureTables) {
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList(customerDatabase.getDatabaseName())
@ -337,7 +384,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
.port(MYSQL_CONTAINER.getDatabasePort())
.splitSize(splitSize)
.fetchSize(2)
.evenlyDistributionFactor(evenlyDistributionFactor)
.distributionFactorUpper(distributionFactorUpper)
.distributionFactorLower(distributionLower)
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())

@ -48,10 +48,11 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.EVENLY_DISTRIBUTION_FACTOR;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
@ -117,7 +118,8 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -129,7 +131,8 @@ public class MySqlTableSourceFactoryTest {
properties.put("server-id", "123-126");
properties.put("scan.incremental.snapshot.chunk.size", "8000");
properties.put("chunk-meta.group.size", "3000");
properties.put("evenly-distribution.factor", "40.5");
properties.put("split-key.even-distribution.factor.upper-bound", "40.5");
properties.put("split-key.even-distribution.factor.lower-bound", "0.01");
properties.put("scan.snapshot.fetch.size", "100");
properties.put("connect.timeout", "45s");
@ -155,6 +158,7 @@ public class MySqlTableSourceFactoryTest {
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
40.5d,
0.01d,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -189,7 +193,8 @@ public class MySqlTableSourceFactoryTest {
Duration.ofSeconds(45),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -222,7 +227,8 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@ -257,7 +263,8 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -312,7 +319,8 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -376,7 +384,8 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@ -413,7 +422,8 @@ public class MySqlTableSourceFactoryTest {
CONNECT_TIMEOUT.defaultValue(),
CONNECT_MAX_RETRIES.defaultValue(),
CONNECTION_POOL_SIZE.defaultValue(),
EVENLY_DISTRIBUTION_FACTOR.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@ -504,7 +514,7 @@ public class MySqlTableSourceFactoryTest {
try {
Map<String, String> properties = getAllOptions();
properties.put("scan.incremental.snapshot.enabled", "true");
properties.put("evenly-distribution.factor", "0.8");
properties.put("split-key.even-distribution.factor.upper-bound", "0.8");
createTableSource(properties);
fail("exception expected");
@ -512,7 +522,7 @@ public class MySqlTableSourceFactoryTest {
assertThat(
t,
containsMessage(
"The value of option 'evenly-distribution.factor' must larger than or equals 1.0, but is 0.8"));
"The value of option 'split-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8"));
}
// validate illegal connection pool size

@ -79,6 +79,64 @@ VALUES (101,"user_1","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- create table whose split key is evenly distributed
CREATE TABLE customers_even_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_even_dist
VALUES (101,'user_1','Shanghai','123567891234'),
(102,'user_2','Shanghai','123567891234'),
(103,'user_3','Shanghai','123567891234'),
(104,'user_4','Shanghai','123567891234'),
(105,'user_5','Shanghai','123567891234'),
(106,'user_6','Shanghai','123567891234'),
(107,'user_7','Shanghai','123567891234'),
(108,'user_8','Shanghai','123567891234'),
(109,'user_9','Shanghai','123567891234'),
(110,'user_10','Shanghai','123567891234');
-- create table whose split key is evenly distributed and sparse
CREATE TABLE customers_sparse_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_sparse_dist
VALUES (2,'user_1','Shanghai','123567891234'),
(4,'user_2','Shanghai','123567891234'),
(6,'user_3','Shanghai','123567891234'),
(8,'user_4','Shanghai','123567891234'),
(10,'user_5','Shanghai','123567891234'),
(16,'user_6','Shanghai','123567891234'),
(17,'user_7','Shanghai','123567891234'),
(18,'user_8','Shanghai','123567891234'),
(20,'user_9','Shanghai','123567891234'),
(22,'user_10','Shanghai','123567891234');
-- create table whose split key is evenly distributed and dense
CREATE TABLE customers_dense_dist (
id1 INTEGER NOT NULL,
id2 VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512),
PRIMARY KEY(id1, id2)
);
INSERT INTO customers_dense_dist
VALUES (1,'user_1','Shanghai','123567891234'),
(1,'user_2','Shanghai','123567891234'),
(1,'user_3','Shanghai','123567891234'),
(1,'user_4','Shanghai','123567891234'),
(2,'user_5','Shanghai','123567891234'),
(2,'user_6','Shanghai','123567891234'),
(2,'user_7','Shanghai','123567891234'),
(3,'user_8','Shanghai','123567891234'),
(3,'user_9','Shanghai','123567891234'),
(3,'user_10','Shanghai','123567891234');
-- table has combined primary key
CREATE TABLE customer_card (
card_no BIGINT NOT NULL,
@ -170,8 +228,8 @@ CREATE TABLE shopping_cart_dec (
insert into shopping_cart_dec
VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
(124456.456, 'KIND_002', 'user_1', 'my shopping cart'),
(125489.6789, 'KIND_003', 'user_1', 'my shopping cart');
(123457.456, 'KIND_002', 'user_2', 'my shopping cart'),
(123458.6789, 'KIND_003', 'user_3', 'my shopping cart');
-- create table whose primary key are produced by snowflake algorithm
CREATE TABLE address (

Loading…
Cancel
Save