[mysql] Simplify the options for Mysql Source

pull/270/head
Leonard Xu 4 years ago
parent ff6b638262
commit 75c01c530e

@ -126,11 +126,15 @@ public class StatefulTaskContext {
this.taskContext =
new MySqlTaskContextImpl(connectorConfig, databaseSchema, binaryLogClient);
final int queueSize =
mySqlSplit.isSnapshotSplit()
? Integer.MAX_VALUE
: connectorConfig.getMaxQueueSize();
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSize(queueSize)
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(
() ->
@ -365,7 +369,7 @@ public class StatefulTaskContext {
.with("database.responseBuffering", "adaptive")
.with(
"database.fetchSize",
configuration.getInteger(MySqlSourceOptions.SCAN_FETCH_SIZE))
configuration.getInteger(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE))
.build();
}
}

@ -24,6 +24,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
import java.time.Duration;
/** Configurations for {@link MySqlParallelSource}. */
public class MySqlSourceOptions {
@ -81,41 +83,41 @@ public class MySqlSourceOptions {
.withDescription(
"A numeric ID or a numeric ID range of this database client, "
+ "The numeric ID syntax is like '5400', the numeric ID range syntax "
+ "is like '5400,5408', The numeric ID range syntax is required when "
+ "'snapshot.parallel-read' enabled. Every ID must be unique across all "
+ "is like '5400-5408', The numeric ID range syntax is required when "
+ "'scan.snapshot.parallel-read' enabled. Every ID must be unique across all "
+ "currently-running database processes in the MySQL cluster. This connector"
+ " joins the MySQL database cluster as another server (with this unique ID) "
+ "so it can read the binlog. By default, a random number is generated between"
+ " 5400 and 6400, though we recommend setting an explicit value.");
public static final ConfigOption<Boolean> SNAPSHOT_PARALLEL_SCAN =
ConfigOptions.key("snapshot.parallel-scan")
public static final ConfigOption<Boolean> SCAN_SNAPSHOT_PARALLEL_READ =
ConfigOptions.key("scan.snapshot.parallel-read")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription(
"Enable parallel scan snapshot of table or not, false by default."
"Enable parallel read snapshot of table or not, false by default."
+ "The 'server-id' is required to be a range syntax like '5400,5408'.");
public static final ConfigOption<Integer> SCAN_SPLIT_SIZE =
ConfigOptions.key("scan.split.size")
public static final ConfigOption<Integer> SCAN_SNAPSHOT_CHUNK_SIZE =
ConfigOptions.key("scan.snapshot.chunk.size")
.intType()
.defaultValue(8096)
.withDescription("The split size used to cut splits for table.");
public static final ConfigOption<Integer> SCAN_FETCH_SIZE =
ConfigOptions.key("scan.fetch.size")
.withDescription(
"The chunk size of table snapshot, the table is cut to multiple chunks when read the snapshot of table.");
public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
ConfigOptions.key("scan.snapshot.fetch.size")
.intType()
.defaultValue(1024)
.withDescription("The fetch size for per poll.");
.withDescription(
"The maximum fetch size for per poll when read table snapshot.");
public static final ConfigOption<String> SCAN_SPLIT_COLUMN =
ConfigOptions.key("scan.split.column")
.stringType()
.noDefaultValue()
public static final ConfigOption<Duration> CONNECT_TIMEOUT =
ConfigOptions.key("connect.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The single column used to cut splits for table,"
+ " the default value is primary key. If the primary key contains"
+ " multiple columns, this option is required to configure,"
+ " the configured column should make the splits as small as possible.");
"The maximum time in milliseconds that the connector should wait after trying to connect to the MySQL database server before timing out.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
@ -147,30 +149,26 @@ public class MySqlSourceOptions {
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");
public static final ConfigOption<Boolean> SCAN_OPTIMIZE_INTEGRAL_KEY =
ConfigOptions.key("scan.optimize.integral-key")
.booleanType()
.defaultValue(true)
.withDescription(
"Optimization to calculate the boundary of table snapshot split base on numeric value rather than querying the DB,"
+ " by default this option is enabled.");
// utils
public static String validateAndGetServerId(ReadableConfig configuration) {
final String serverIdValue = configuration.get(MySqlSourceOptions.SERVER_ID);
// validate server id range
if (configuration.get(MySqlSourceOptions.SNAPSHOT_PARALLEL_SCAN)) {
if (configuration.get(SCAN_SNAPSHOT_PARALLEL_READ)) {
String errMsg =
"The server id should be a range syntax like '5400,5404' when enable 'snapshot.parallel-scan' to 'true', "
"The '%s' should be a range syntax like '5400-5404' when enable '%s', "
+ "but actual is %s";
Preconditions.checkState(
serverIdValue != null
&& serverIdValue.contains(",")
&& serverIdValue.split(",").length == 2,
String.format(errMsg, serverIdValue));
&& serverIdValue.contains("-")
&& serverIdValue.split("-").length == 2,
String.format(
errMsg,
SERVER_ID.key(),
SCAN_SNAPSHOT_PARALLEL_READ.key(),
serverIdValue));
try {
Integer.parseInt(serverIdValue.split(",")[0].trim());
Integer.parseInt(serverIdValue.split(",")[1].trim());
Integer.parseInt(serverIdValue.split("-")[0].trim());
Integer.parseInt(serverIdValue.split("-")[1].trim());
} catch (NumberFormatException e) {
throw new IllegalStateException(String.format(errMsg, serverIdValue), e);
}
@ -197,8 +195,8 @@ public class MySqlSourceOptions {
public static String getServerIdForSubTask(Configuration configuration, int subtaskId) {
String serverIdRange = configuration.getString(MySqlSourceOptions.SERVER_ID);
int serverIdStart = Integer.parseInt(serverIdRange.split(",")[0].trim());
int serverIdEnd = Integer.parseInt(serverIdRange.split(",")[1].trim());
int serverIdStart = Integer.parseInt(serverIdRange.split("-")[0].trim());
int serverIdEnd = Integer.parseInt(serverIdRange.split("-")[1].trim());
int serverId = serverIdStart + subtaskId;
Preconditions.checkState(
serverIdStart <= serverId && serverId <= serverIdEnd,

@ -20,6 +20,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.assigner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
@ -31,6 +32,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.SchemaChangeEvent;
@ -58,10 +60,11 @@ import java.util.Set;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.toDebeziumConfig;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isOptimizedKeyType;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.rowToArray;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SplitKeyUtils.getSplitKeyType;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SplitKeyUtils.splitKeyIsAutoIncremented;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.buildMaxSplitKeyQuery;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.buildMinMaxSplitKeyQuery;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.buildSplitBoundaryQuery;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.getSplitKey;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.readTableSplitStatement;
@ -75,36 +78,34 @@ public class MySqlSnapshotSplitAssigner {
private final LinkedList<TableId> remainingTables;
private final Configuration configuration;
private final int splitSize;
private final RowType splitKeyType;
private final boolean enableIntegralOptimization;
private final RowType definedPkType;
private TableId currentTableId;
private int currentTableSplitSeq;
private Object[] currentTableMaxSplitKey;
private RelationalTableFilters tableFilters;
private MySqlConnection jdbc;
private Map<TableId, TableChange> cachedTableSchemas;
private Map<TableId, TableChange> tableSchemas;
private MySqlDatabaseSchema databaseSchema;
private RowType splitKeyType;
public MySqlSnapshotSplitAssigner(
Configuration configuration,
RowType pkRowType,
RowType definedPkType,
Collection<TableId> alreadyProcessedTables,
Collection<MySqlSplit> remainingSplits) {
this.configuration = configuration;
this.splitKeyType = getSplitKey(configuration, pkRowType);
this.definedPkType = definedPkType;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.remainingTables = new LinkedList<>();
this.splitSize = configuration.getInteger(MySqlSourceOptions.SCAN_SPLIT_SIZE);
this.splitSize = configuration.getInteger(MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE);
Preconditions.checkState(
splitSize > 1,
String.format(
"The value of option 'scan.split.size' must bigger than 1, but is %d",
"The value of option 'scan.snapshot.chunk.size' must bigger than 1, but is %d",
splitSize));
this.enableIntegralOptimization =
configuration.getBoolean(MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY);
this.cachedTableSchemas = new HashMap<>();
this.tableSchemas = new HashMap<>();
}
public void open() {
@ -152,9 +153,14 @@ public class MySqlSnapshotSplitAssigner {
int splitCnt = 0;
long start = System.currentTimeMillis();
List<MySqlSplit> splitsForCurrentTable = new ArrayList<>();
LOG.info("Begin to analyze splits for table {} ", tableId);
tableSchemas.put(tableId, getTableSchema(tableId));
splitKeyType = getSplitKeyType(definedPkType, tableSchemas.get(tableId).getTable());
// optimization for integral, bigDecimal type
if (enableIntegralOptimization
final Table currentTable = tableSchemas.get(currentTableId).getTable();
if (splitKeyIsAutoIncremented(splitKeyType, currentTable)
&& isOptimizedKeyType(splitKeyType.getTypeAt(0).getTypeRoot())) {
String splitKeyFieldName = splitKeyType.getFieldNames().get(0);
Object[] minMaxSplitKey = new Object[2];
@ -321,11 +327,7 @@ public class MySqlSnapshotSplitAssigner {
private MySqlSplit createSnapshotSplit(Object[] splitStart, Object[] splitEnd) {
Map<TableId, TableChange> tableChangeMap = new HashMap<>();
// cache for optimization
if (!cachedTableSchemas.containsKey(currentTableId)) {
cachedTableSchemas.putAll(getTableSchema());
}
tableChangeMap.put(currentTableId, cachedTableSchemas.get(currentTableId));
tableChangeMap.put(currentTableId, tableSchemas.get(currentTableId));
return new MySqlSnapshotSplit(
currentTableId,
@ -455,12 +457,11 @@ public class MySqlSnapshotSplitAssigner {
}
}
private Map<TableId, TableChange> getTableSchema() {
private TableChange getTableSchema(final TableId tableId) {
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
try {
jdbc.query(
"SHOW CREATE TABLE " + quote(currentTableId),
"SHOW CREATE TABLE " + quote(tableId),
rs -> {
if (rs.next()) {
final String ddl = rs.getString(2);
@ -470,21 +471,26 @@ public class MySqlSnapshotSplitAssigner {
toDebeziumConfig(configuration)));
List<SchemaChangeEvent> schemaChangeEvents =
databaseSchema.parseSnapshotDdl(
ddl,
currentTableId.catalog(),
offsetContext,
Instant.now());
ddl, tableId.catalog(), offsetContext, Instant.now());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
for (TableChange tableChange :
schemaChangeEvent.getTableChanges()) {
tableChangeMap.put(currentTableId, tableChange);
tableChangeMap.put(tableId, tableChange);
}
}
}
});
} catch (SQLException e) {
LOG.error("Get table schema error.", e);
throw new FlinkRuntimeException(
String.format("Get schema of table %s error.", tableId), e);
}
if (tableChangeMap.isEmpty()) {
throw new FlinkRuntimeException(
String.format(
"Can not get schema of table %s, please check the configured table name.",
tableId));
}
return tableChangeMap;
return tableChangeMap.get(tableId);
}
}

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import io.debezium.relational.Table;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/** Utils to obtain and validate split key of table. */
public class SplitKeyUtils {
private SplitKeyUtils() {}
/**
* Returns the split key type, the split key should be single field
*
* <p>The split key is primary key when primary key contains single field, the split key will be
* inferred when the primary key contains multiple field.
*
* @param definedPkType the defined primary key type in Flink.
* @param actualTable the table schema in MySQL database.
*/
public static RowType getSplitKeyType(RowType definedPkType, Table actualTable) {
Preconditions.checkState(
definedPkType.getFieldCount() >= 1,
"The primary key is required in Flink SQL Table definition.");
Preconditions.checkState(
!actualTable.primaryKeyColumnNames().isEmpty(),
String.format(
"Only supports capture table with primary key, but the table %s has no primary key.",
actualTable.id()));
validatePrimaryKey(definedPkType.getFieldNames(), actualTable.primaryKeyColumnNames());
if (definedPkType.getFieldCount() == 1) {
return definedPkType;
} else {
// use the first defined primary key used combine key.
return new RowType(Arrays.asList(definedPkType.getFields().get(0)));
}
}
public static boolean splitKeyIsAutoIncremented(RowType splitKeyType, Table actualTable) {
final String splitKeyName = unquoteColumnName(splitKeyType.getFieldNames().get(0));
return !actualTable.primaryKeyColumnNames().isEmpty()
&& actualTable.isAutoIncremented(splitKeyName);
}
private static void validatePrimaryKey(
List<String> definedPkFieldnames, List<String> actualPkFieldNames) {
List<String> formattedDefinedPk =
definedPkFieldnames.stream()
.map(SplitKeyUtils::unquoteColumnName)
.sorted()
.collect(Collectors.toList());
List<String> formattedActualPk =
actualPkFieldNames.stream()
.map(SplitKeyUtils::unquoteColumnName)
.sorted()
.collect(Collectors.toList());
String exceptionMsg =
String.format(
"The defined primary key %s in Flink is not matched with actual primary key %s in MySQL",
definedPkFieldnames, actualPkFieldNames);
Preconditions.checkState(
formattedDefinedPk.size() == formattedActualPk.size()
&& formattedDefinedPk.containsAll(formattedActualPk),
exceptionMsg);
}
public static String unquoteColumnName(String columnName) {
if (columnName == null) {
return null;
}
if (columnName.length() < 2) {
return columnName.toLowerCase();
}
Character quotingChar = deriveQuotingChar(columnName);
if (quotingChar != null) {
columnName = columnName.substring(1, columnName.length() - 1);
columnName =
columnName.replace(
quotingChar.toString() + quotingChar.toString(),
quotingChar.toString());
}
return columnName.toLowerCase();
}
private static Character deriveQuotingChar(String columnName) {
char first = columnName.charAt(0);
char last = columnName.charAt(columnName.length() - 1);
if (first == last && (first == '"' || first == '\'' || first == '`')) {
return first;
}
return null;
}
}

@ -18,11 +18,8 @@
package com.alibaba.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
@ -38,29 +35,6 @@ public class StatementUtils {
private StatementUtils() {}
/**
* Returns the split key, the split key should always single field
*
* <p>The split key is primary key when primary key contains single field, the split key will be
* inferred when the primary key contains multiple field.
*
* @param pkType the primary key type
* @return
*/
public static RowType getSplitKey(Configuration configuration, RowType pkType) {
Preconditions.checkState(
pkType.getFieldCount() >= 1, "The primary key is required in table definition.");
if (pkType.getFieldCount() == 1) {
return pkType;
} else {
String splitColumnName = configuration.getString(MySqlSourceOptions.SCAN_SPLIT_COLUMN);
return new RowType(
pkType.getFields().stream()
.filter(r -> splitColumnName.equalsIgnoreCase(r.getName()))
.collect(Collectors.toList()));
}
}
public static String buildSplitBoundaryQuery(
TableId tableId,
RowType pkRowType,

@ -41,6 +41,7 @@ import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
@ -50,9 +51,6 @@ import java.util.Optional;
import java.util.Properties;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SPLIT_COLUMN;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_ID;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@ -71,11 +69,10 @@ public class MySqlTableSource implements ScanTableSource {
private final String tableName;
private final ZoneId serverTimeZone;
private final Properties dbzProperties;
private final boolean enableIntegralOptimization;
private final boolean enableParallelRead;
private final int splitSize;
private final int fetchSize;
private final String splitColumn;
private final Duration connectTimeout;
private final StartupOptions startupOptions;
public MySqlTableSource(
@ -89,11 +86,10 @@ public class MySqlTableSource implements ScanTableSource {
ZoneId serverTimeZone,
Properties dbzProperties,
@Nullable String serverId,
boolean enableIntegralOptimization,
boolean enableParallelRead,
int splitSize,
int fetchSize,
@Nullable String splitColumn,
Duration connectTimeout,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
@ -105,11 +101,10 @@ public class MySqlTableSource implements ScanTableSource {
this.serverId = serverId;
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.enableIntegralOptimization = enableIntegralOptimization;
this.enableParallelRead = enableParallelRead;
this.splitSize = splitSize;
this.fetchSize = fetchSize;
this.splitColumn = splitColumn;
this.connectTimeout = connectTimeout;
this.startupOptions = startupOptions;
}
@ -133,7 +128,7 @@ public class MySqlTableSource implements ScanTableSource {
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
if (enableParallelRead) {
RowType pkRowType = getPkType(physicalSchema);
Configuration configuration = getParallelSourceConf(pkRowType);
Configuration configuration = getParallelSourceConf();
MySqlParallelSource<RowData> parallelSource =
new MySqlParallelSource<>(pkRowType, deserializer, configuration);
return SourceProvider.of(parallelSource);
@ -172,8 +167,11 @@ public class MySqlTableSource implements ScanTableSource {
return RowType.of(pkFieldTypes, pkFieldNames.toArray(new String[0]));
}
private Configuration getParallelSourceConf(RowType pkRowType) {
private Configuration getParallelSourceConf() {
Map<String, String> properties = new HashMap<>();
if (dbzProperties != null) {
dbzProperties.forEach((k, v) -> properties.put(k.toString(), v.toString()));
}
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
properties.put("database.hostname", checkNotNull(hostname));
properties.put("database.user", checkNotNull(username));
@ -186,9 +184,10 @@ public class MySqlTableSource implements ScanTableSource {
* The server id is required, it will be replaced to 'database.server.id' when build {@Link
* MySQLSplitReader}
*/
properties.put(SERVER_ID.key(), serverId);
properties.put("server-id", serverId);
properties.put("scan.split.size", String.valueOf(splitSize));
properties.put("scan.fetch.size", String.valueOf(fetchSize));
properties.put("connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));
if (database != null) {
properties.put("database.whitelist", database);
@ -203,14 +202,6 @@ public class MySqlTableSource implements ScanTableSource {
// set mode
properties.put("snapshot.mode", "initial");
properties.put(
SCAN_OPTIMIZE_INTEGRAL_KEY.key(), String.valueOf(enableIntegralOptimization));
// set split key
if (pkRowType.getFieldCount() > 1) {
properties.put(SCAN_SPLIT_COLUMN.key(), splitColumn);
}
return Configuration.fromMap(properties);
}
@ -227,11 +218,10 @@ public class MySqlTableSource implements ScanTableSource {
serverTimeZone,
dbzProperties,
serverId,
enableIntegralOptimization,
enableParallelRead,
splitSize,
fetchSize,
splitColumn,
connectTimeout,
startupOptions);
}
@ -245,7 +235,6 @@ public class MySqlTableSource implements ScanTableSource {
}
MySqlTableSource that = (MySqlTableSource) o;
return port == that.port
&& enableIntegralOptimization == that.enableIntegralOptimization
&& enableParallelRead == that.enableParallelRead
&& splitSize == that.splitSize
&& fetchSize == that.fetchSize
@ -258,7 +247,7 @@ public class MySqlTableSource implements ScanTableSource {
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(serverTimeZone, that.serverTimeZone)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(splitColumn, that.splitColumn)
&& Objects.equals(connectTimeout, that.connectTimeout)
&& Objects.equals(startupOptions, that.startupOptions);
}
@ -275,11 +264,10 @@ public class MySqlTableSource implements ScanTableSource {
tableName,
serverTimeZone,
dbzProperties,
enableIntegralOptimization,
enableParallelRead,
splitSize,
fetchSize,
splitColumn,
connectTimeout,
startupOptions);
}

@ -30,25 +30,25 @@ import org.apache.flink.util.Preconditions;
import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions;
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.HOSTNAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.PASSWORD;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.PORT;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_FETCH_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SPLIT_COLUMN;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SPLIT_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_PARALLEL_READ;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_STARTUP_MODE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_ID;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_TIME_ZONE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SNAPSHOT_PARALLEL_SCAN;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.TABLE_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.USERNAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.validateAndGetServerId;
@ -72,22 +72,20 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
int splitSize = config.get(SCAN_SPLIT_SIZE);
int fetchSize = config.get(SCAN_FETCH_SIZE);
int splitSize = config.get(SCAN_SNAPSHOT_CHUNK_SIZE);
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
String serverId = validateAndGetServerId(config);
boolean enableIntegralOptimization = config.get(SCAN_OPTIMIZE_INTEGRAL_KEY);
boolean enableParallelRead = config.get(SNAPSHOT_PARALLEL_SCAN);
String splitColumn = null;
boolean enableParallelRead = config.get(SCAN_SNAPSHOT_PARALLEL_READ);
StartupOptions startupOptions = getStartupOptions(config);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema);
splitColumn = validateAndGetSplitColumn(config.get(SCAN_SPLIT_COLUMN), physicalSchema);
validateStartupOptionIfEnableParallel(startupOptions);
}
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
return new MySqlTableSource(
physicalSchema,
@ -100,11 +98,10 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId,
enableIntegralOptimization,
enableParallelRead,
splitSize,
fetchSize,
splitColumn,
connectTimeout,
startupOptions);
}
@ -134,11 +131,10 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SNAPSHOT_PARALLEL_SCAN);
options.add(SCAN_SPLIT_SIZE);
options.add(SCAN_FETCH_SIZE);
options.add(SCAN_SPLIT_COLUMN);
options.add(SCAN_OPTIMIZE_INTEGRAL_KEY);
options.add(SCAN_SNAPSHOT_PARALLEL_READ);
options.add(SCAN_SNAPSHOT_CHUNK_SIZE);
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
options.add(CONNECT_TIMEOUT);
return options;
}
@ -187,47 +183,12 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
private void validatePrimaryKeyIfEnableParallel(TableSchema physicalSchema) {
if (!physicalSchema.getPrimaryKey().isPresent()) {
throw new ValidationException(
"The primary key is necessary when enable 'snapshot.parallel-scan' to 'true'");
String.format(
"The primary key is necessary when enable '%s' to 'true'",
SCAN_SNAPSHOT_PARALLEL_READ));
}
}
private String validateAndGetSplitColumn(String splitColumn, TableSchema physicalSchema) {
String validatedSplitColumn = splitColumn;
if (physicalSchema.getPrimaryKey().isPresent()) {
int pkSize = physicalSchema.getPrimaryKey().get().getColumns().size();
if (pkSize > 1) {
Preconditions.checkState(
splitColumn != null,
"The 'scan.split.column' option is required if the primary key contains multiple fields");
Preconditions.checkState(
physicalSchema.getPrimaryKey().get().getColumns().contains(splitColumn),
String.format(
"The 'scan.split.column' value %s should be one field of the primary key %s, but it does not.",
splitColumn, physicalSchema.getPrimaryKey().get().getColumns()));
return splitColumn;
}
// single primary key field
else {
// use primary key by default
if (splitColumn == null) {
validatedSplitColumn = physicalSchema.getPrimaryKey().get().getColumns().get(0);
} else {
// validate configured split column
Preconditions.checkState(
physicalSchema.getPrimaryKey().get().getColumns().contains(splitColumn),
String.format(
"The 'scan.split.column' value %s should be one field of the primary key %s, but it does not.",
splitColumn,
physicalSchema.getPrimaryKey().get().getColumns()));
}
}
} else {
throw new ValidationException(
"The primary key is necessary when enable 'snapshot.parallel-scan' to 'true'");
}
return validatedSplitColumn;
}
private void validateStartupOptionIfEnableParallel(StartupOptions startupOptions) {
// validate mode
Preconditions.checkState(

@ -33,7 +33,6 @@ import com.alibaba.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
import com.alibaba.ververica.cdc.connectors.mysql.source.assigner.MySqlSnapshotSplitAssigner;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
@ -51,14 +50,11 @@ import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -67,7 +63,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static com.alibaba.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.BINLOG_SPLIT_ID;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
@ -77,32 +72,20 @@ import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtil
import static org.junit.Assert.assertEquals;
/** Tests for {@link BinlogSplitReader}. */
@RunWith(Parameterized.class)
public class BinlogSplitReaderTest extends MySqlTestBase {
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
private final BinaryLogClient binaryLogClient;
private final MySqlConnection mySqlConnection;
private final boolean useIntegralTypeOptimization;
@Parameterized.Parameters(name = "useIntegralTypeOptimization: {0}")
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public BinlogSplitReaderTest(boolean useIntegralTypeOptimization) {
this.useIntegralTypeOptimization = useIntegralTypeOptimization;
Configuration configuration = getConfig(new String[] {"customers"});
this.binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
this.mySqlConnection = StatefulTaskContext.getConnection(configuration);
}
private BinaryLogClient binaryLogClient;
private MySqlConnection mySqlConnection;
@Test
public void testReadSingleBinlogSplit() throws Exception {
customerDatabase.createAndInitialize();
Configuration configuration = getConfig(new String[] {"customers"});
binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
mySqlConnection = StatefulTaskContext.getConnection(configuration);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
@ -113,32 +96,23 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType();
List<MySqlSnapshotSplit> splits = getMySQLSplits(configuration, pkType);
String[] expected =
useIntegralTypeOptimization
? new String[] {
"+U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]"
}
: new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]"
};
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]"
};
List<String> actual =
readBinlogSplits(splits, dataType, pkType, configuration, 1, expected.length);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
@ -148,6 +122,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
public void testReadAllBinlogSplitsForOneTable() throws Exception {
customerDatabase.createAndInitialize();
Configuration configuration = getConfig(new String[] {"customers"});
binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
mySqlConnection = StatefulTaskContext.getConnection(configuration);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
@ -203,6 +179,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
public void testReadAllBinlogForTableWithSingleLine() throws Exception {
customerDatabase.createAndInitialize();
Configuration configuration = getConfig(new String[] {"customer_card_single_line"});
binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
mySqlConnection = StatefulTaskContext.getConnection(configuration);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
@ -215,7 +193,6 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
DataTypes.FIELD("level", DataTypes.STRING()))
.getLogicalType();
configuration.set(MySqlSourceOptions.SCAN_SPLIT_COLUMN, "card_no");
List<MySqlSnapshotSplit> splits = getMySQLSplits(configuration, pkType);
String[] expected =
@ -237,6 +214,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
customerDatabase.createAndInitialize();
Configuration configuration =
getConfig(new String[] {"customer_card", "customer_card_single_line"});
binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
mySqlConnection = StatefulTaskContext.getConnection(configuration);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
@ -249,7 +228,6 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
DataTypes.FIELD("level", DataTypes.STRING()))
.getLogicalType();
configuration.set(MySqlSourceOptions.SCAN_SPLIT_COLUMN, "card_no");
List<MySqlSnapshotSplit> splits = getMySQLSplits(configuration, pkType);
String[] expected =
new String[] {
@ -525,7 +503,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
properties.put("database.password", customerDatabase.getPassword());
properties.put("database.whitelist", customerDatabase.getDatabaseName());
properties.put("database.history.skip.unparseable.ddl", "true");
properties.put("server-id-range", "1001, 1002");
properties.put("server-id-range", "1001-1002");
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
properties.put("snapshot.mode", "initial");
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
@ -535,15 +513,9 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.collect(Collectors.toList());
properties.put("table.whitelist", String.join(",", captureTableIds));
properties.put(
SCAN_OPTIMIZE_INTEGRAL_KEY.key(), String.valueOf(useIntegralTypeOptimization));
if (useIntegralTypeOptimization) {
properties.put("scan.split.size", "1024");
properties.put("scan.fetch.size", "1024");
} else {
properties.put("scan.split.size", "10");
properties.put("scan.fetch.size", "2");
}
properties.put("scan.snapshot.chunk.size", "10");
properties.put("scan.snapshot.fetch.size", "2");
return Configuration.fromMap(properties);
}

@ -32,7 +32,6 @@ import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
import com.alibaba.ververica.cdc.connectors.mysql.source.assigner.MySqlSnapshotSplitAssigner;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
@ -43,13 +42,10 @@ import io.debezium.connector.mysql.MySqlConnection;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -58,36 +54,24 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static org.junit.Assert.assertEquals;
/** Tests for {@link SnapshotSplitReader}. */
@RunWith(Parameterized.class)
public class SnapshotSplitReaderTest extends MySqlTestBase {
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
private final boolean useIntegralTypeOptimization;
private final BinaryLogClient binaryLogClient;
private final MySqlConnection mySqlConnection;
@Parameterized.Parameters(name = "useIntegralTypeOptimization: {0}")
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
public SnapshotSplitReaderTest(boolean useIntegralTypeOptimization) {
this.useIntegralTypeOptimization = useIntegralTypeOptimization;
Configuration configuration = getConfig(new String[] {"customers"});
this.binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
this.mySqlConnection = StatefulTaskContext.getConnection(configuration);
}
private static BinaryLogClient binaryLogClient;
private static MySqlConnection mySqlConnection;
@BeforeClass
public static void init() {
customerDatabase.createAndInitialize();
Configuration configuration = getConfig(new String[] {"customers"});
binaryLogClient = StatefulTaskContext.getBinaryClient(configuration);
mySqlConnection = StatefulTaskContext.getConnection(configuration);
}
@Test
@ -104,19 +88,17 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
List<MySqlSplit> mySqlSplits = getMySQLSplits(configuration, pkType);
String[] expected =
useIntegralTypeOptimization
? new String[] {}
: new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]"
};
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]"
};
List<String> actual = readTableSnapshotSplits(mySqlSplits, configuration, 1, dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
}
@ -166,7 +148,6 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
@Test
public void testReadAllSplitForTableWithSingleLine() throws Exception {
Configuration configuration = getConfig(new String[] {"customer_card_single_line"});
configuration.set(MySqlSourceOptions.SCAN_SPLIT_COLUMN, "card_no");
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
@ -190,7 +171,6 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
public void testReadAllSnapshotSplitsForTables() throws Exception {
Configuration configuration =
getConfig(new String[] {"customer_card", "customer_card_single_line"});
configuration.set(MySqlSourceOptions.SCAN_SPLIT_COLUMN, "card_no");
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
@ -314,7 +294,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
return mySqlSplitList;
}
private Configuration getConfig(String[] captureTables) {
private static Configuration getConfig(String[] captureTables) {
Map<String, String> properties = new HashMap<>();
properties.put("database.server.name", "embedded-test");
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
@ -323,7 +303,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
properties.put("database.password", customerDatabase.getPassword());
properties.put("database.whitelist", customerDatabase.getDatabaseName());
properties.put("database.history.skip.unparseable.ddl", "true");
properties.put("server-id-range", "1001, 1002");
properties.put("server-id-range", "1001-1002");
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
properties.put("snapshot.mode", "initial");
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
@ -333,15 +313,9 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.collect(Collectors.toList());
properties.put("table.whitelist", String.join(",", captureTableIds));
properties.put(
SCAN_OPTIMIZE_INTEGRAL_KEY.key(), String.valueOf(useIntegralTypeOptimization));
if (useIntegralTypeOptimization) {
properties.put("scan.split.size", "1000");
properties.put("scan.fetch.size", "1024");
} else {
properties.put("scan.split.size", "10");
properties.put("scan.fetch.size", "2");
}
properties.put("scan.snapshot.chunk.size", "10");
properties.put("scan.snapshot.fetch.size", "2");
return Configuration.fromMap(properties);
}

@ -123,14 +123,14 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger {
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'snapshot.parallel-scan' = 'true',"
+ " 'scan.snapshot.parallel-read' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.split.size' = '1024',"
+ " 'scan.snapshot.chunk.size' = '1024',"
+ " 'server-id' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
@ -243,7 +243,7 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger {
private String getServerId() {
final Random random = new Random();
int serverIdStart = random.nextInt(100) + 5400;
return serverIdStart + "," + (serverIdStart + PARALLELISM);
return serverIdStart + "-" + (serverIdStart + PARALLELISM);
}
private void sleepMs(long millis) {

@ -21,6 +21,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.assigner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.ExceptionUtils;
import com.alibaba.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
@ -39,10 +40,10 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for {@link MySqlSnapshotSplitAssigner}. */
@ -104,69 +105,28 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
}
@Test
public void testEnableIntegralKeyOptimization() {
public void testEnableAutoIncrementedKeyOptimization() {
String[] expected =
new String[] {
"customers null [101]",
"customers [101] [1101]",
"customers [1101] [2000]",
"customers [2000] null"
};
final RowType pkType =
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT())).getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(1000, pkType, new String[] {"customers"}, true);
assertArrayEquals(expected, splits.toArray());
}
@Test
public void testEnableIntegralKeyOptimizationWithMultipleTable() {
String[] expected =
new String[] {
"customers null [101]",
"customers [101] [1101]",
"customers [1101] [2000]",
"customers [2000] null",
"customers_1 null [101]",
"customers_1 [101] [1101]",
"customers_1 [1101] [2000]",
"customers_1 [2000] null"
"shopping_cart_big null [1]",
"shopping_cart_big [1] [3]",
"shopping_cart_big [3] null"
};
final RowType pkType =
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT())).getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(
1000, pkType, new String[] {"customers", "customers_1"}, true);
assertArrayEquals(expected, splits.toArray());
}
@Test
public void testEnableBigIntKeyOptimization() {
String[] expected =
new String[] {
"shopping_cart_big null [9223372036854773807]",
"shopping_cart_big [9223372036854773807] [9223372036854774807]",
"shopping_cart_big [9223372036854774807] [9223372036854775807]",
"shopping_cart_big [9223372036854775807] null"
};
// MySQL BIGINT UNSIGNED <=> Flink DECIMAL(20, 0)
final RowType pkType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("product_no", DataTypes.DECIMAL(20, 0)))
.getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(1000, pkType, new String[] {"shopping_cart_big"}, true);
getTestAssignSnapshotSplits(2, pkType, new String[] {"shopping_cart_big"});
assertArrayEquals(expected, splits.toArray());
}
@Test
public void testEnableDecimalKeyOptimization() {
public void testAssignSnapshotSplitsWithDecimalKey() {
String[] expected =
new String[] {
"shopping_cart_dec null [123456.1230]",
"shopping_cart_dec [123456.1230] [124456.1230]",
"shopping_cart_dec [124456.1230] [125456.1230]",
"shopping_cart_dec [125456.1230] [125489.6789]",
"shopping_cart_dec null [124456.4560]",
"shopping_cart_dec [124456.4560] [125489.6789]",
"shopping_cart_dec [125489.6789] null"
};
final RowType pkType =
@ -174,23 +134,14 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
DataTypes.ROW(DataTypes.FIELD("product_no", DataTypes.DECIMAL(10, 4)))
.getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(1000, pkType, new String[] {"shopping_cart_dec"}, true);
getTestAssignSnapshotSplits(2, pkType, new String[] {"shopping_cart_dec"});
assertArrayEquals(expected, splits.toArray());
}
private List<String> getTestAssignSnapshotSplits(
int splitSize, RowType pkType, String[] captureTables) {
return getTestAssignSnapshotSplits(splitSize, pkType, captureTables, false);
}
private List<String> getTestAssignSnapshotSplits(
int splitSize,
RowType pkType,
String[] captureTables,
boolean enableIntegralOptimization) {
Configuration configuration = getConfig();
configuration.setString("scan.split.size", String.valueOf(splitSize));
configuration.setBoolean(SCAN_OPTIMIZE_INTEGRAL_KEY.key(), enableIntegralOptimization);
configuration.setString("scan.snapshot.chunk.size", String.valueOf(splitSize));
List<String> captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
@ -241,7 +192,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
};
final RowType pkType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("card_no", DataTypes.BIGINT()))
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
DataTypes.FIELD("level", DataTypes.STRING()))
.getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(4, pkType, new String[] {"customer_card"});
@ -257,7 +210,9 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
};
final RowType pkType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("card_no", DataTypes.BIGINT()))
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
DataTypes.FIELD("level", DataTypes.STRING()))
.getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(4, pkType, new String[] {"customer_card_single_line"});
@ -265,7 +220,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
}
@Test
public void testAssignTableWithConfiguredIntSplitKey() {
public void testAssignTableWithCombinedIntSplitKey() {
String[] expected =
new String[] {
"shopping_cart null [102]",
@ -277,7 +232,10 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
};
final RowType pkType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("product_no", DataTypes.BIGINT()))
DataTypes.ROW(
DataTypes.FIELD("product_no", DataTypes.INT()),
DataTypes.FIELD("user_id", DataTypes.STRING()),
DataTypes.FIELD("product_kind", DataTypes.STRING()))
.getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(4, pkType, new String[] {"shopping_cart"});
@ -296,7 +254,10 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
};
final RowType pkType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("user_id", DataTypes.STRING()))
DataTypes.ROW(
DataTypes.FIELD("user_id", DataTypes.STRING()),
DataTypes.FIELD("product_no", DataTypes.INT()),
DataTypes.FIELD("product_kind", DataTypes.STRING()))
.getLogicalType();
List<String> splits =
getTestAssignSnapshotSplits(4, pkType, new String[] {"shopping_cart"});
@ -357,7 +318,24 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
assertThat(
e,
containsMessage(
"The value of option 'scan.split.size' must bigger than 1, but is 1"));
"The value of option 'scan.snapshot.chunk.size' must bigger than 1, but is 1"));
}
}
@Test
public void testUnMatchedPrimaryKey() {
final RowType pkType =
(RowType)
DataTypes.ROW(DataTypes.FIELD("card_no", DataTypes.BIGINT()))
.getLogicalType();
try {
getTestAssignSnapshotSplits(4, pkType, new String[] {"customer_card"});
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The defined primary key [card_no] in Flink is not matched with actual primary key [card_no, level] in MySQL")
.isPresent());
}
}
@ -371,7 +349,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
properties.put("database.password", customerDatabase.getPassword());
properties.put("database.history.skip.unparseable.ddl", "true");
properties.put("server-id.range", "1001,1004");
properties.put("scan.fetch.size", "2");
properties.put("scan.snapshot.fetch.size", "2");
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
properties.put("snapshot.mode", "initial");
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());

@ -117,9 +117,9 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'snapshot.parallel-scan' = '%s',"
+ " 'scan.snapshot.parallel-read' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.split.size' = '%s'"
+ " 'scan.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
@ -246,9 +246,9 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'snapshot.parallel-scan' = '%s',"
+ " 'scan.snapshot.parallel-read' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.split.size' = '%s'"
+ " 'scan.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
@ -666,7 +666,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
final Random random = new Random();
int serverIdStart = random.nextInt(100) + 5400;
if (parallelRead) {
return serverIdStart + "," + (serverIdStart + env.getParallelism());
return serverIdStart + "-" + (serverIdStart + env.getParallelism());
}
return String.valueOf(serverIdStart);
}

@ -35,6 +35,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@ -42,10 +43,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_FETCH_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_OPTIMIZE_INTEGRAL_KEY;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SPLIT_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SNAPSHOT_PARALLEL_SCAN;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -90,11 +90,10 @@ public class MySqlTableSourceFactoryTest {
ZoneId.of("UTC"),
PROPERTIES,
null,
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
SNAPSHOT_PARALLEL_SCAN.defaultValue(),
SCAN_SPLIT_SIZE.defaultValue(),
SCAN_FETCH_SIZE.defaultValue(),
null,
false,
SCAN_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -102,11 +101,12 @@ public class MySqlTableSourceFactoryTest {
@Test
public void testEnableParallelReadSource() {
Map<String, String> properties = getAllOptions();
properties.put("snapshot.parallel-scan", "true");
properties.put("server-id", "123,126");
properties.put("scan.split.column", "aaa");
properties.put("scan.split.size", "8000");
properties.put("scan.fetch.size", "100");
properties.put("scan.snapshot.parallel-read", "true");
properties.put("server-id", "123-126");
properties.put("scan.snapshot.chunk.size", "8000");
properties.put("scan.snapshot.fetch.size", "100");
properties.put("scan.snapshot.parallel-read", String.valueOf(true));
properties.put("connect.timeout", "45s");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
@ -121,45 +121,11 @@ public class MySqlTableSourceFactoryTest {
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
"123,126",
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
"123-126",
true,
8000,
100,
"aaa",
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testDisableIntegralOptimization() {
Map<String, String> properties = getAllOptions();
properties.put("snapshot.parallel-scan", "true");
properties.put("server-id", "123,126");
properties.put("scan.split.column", "aaa");
properties.put("scan.split.size", "8000");
properties.put("scan.fetch.size", "100");
properties.put("scan.optimize.integral-key", "false");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
"123,126",
false,
true,
8000,
100,
"aaa",
Duration.ofSeconds(45),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -187,11 +153,10 @@ public class MySqlTableSourceFactoryTest {
ZoneId.of("Asia/Shanghai"),
dbzProperties,
"4321",
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
SNAPSHOT_PARALLEL_SCAN.defaultValue(),
SCAN_SPLIT_SIZE.defaultValue(),
SCAN_FETCH_SIZE.defaultValue(),
null,
false,
SCAN_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -221,11 +186,10 @@ public class MySqlTableSourceFactoryTest {
ZoneId.of("UTC"),
PROPERTIES,
"4321",
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
SNAPSHOT_PARALLEL_SCAN.defaultValue(),
SCAN_SPLIT_SIZE.defaultValue(),
SCAN_FETCH_SIZE.defaultValue(),
null,
false,
SCAN_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.specificOffset(offsetFile, offsetPos));
assertEquals(expectedSource, actualSource);
}
@ -249,11 +213,10 @@ public class MySqlTableSourceFactoryTest {
ZoneId.of("UTC"),
PROPERTIES,
null,
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
SNAPSHOT_PARALLEL_SCAN.defaultValue(),
SCAN_SPLIT_SIZE.defaultValue(),
SCAN_FETCH_SIZE.defaultValue(),
null,
false,
SCAN_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@ -277,11 +240,10 @@ public class MySqlTableSourceFactoryTest {
ZoneId.of("UTC"),
PROPERTIES,
null,
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
SNAPSHOT_PARALLEL_SCAN.defaultValue(),
SCAN_SPLIT_SIZE.defaultValue(),
SCAN_FETCH_SIZE.defaultValue(),
null,
false,
SCAN_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.earliest());
assertEquals(expectedSource, actualSource);
}
@ -305,11 +267,10 @@ public class MySqlTableSourceFactoryTest {
ZoneId.of("UTC"),
PROPERTIES,
null,
SCAN_OPTIMIZE_INTEGRAL_KEY.defaultValue(),
SNAPSHOT_PARALLEL_SCAN.defaultValue(),
SCAN_SPLIT_SIZE.defaultValue(),
SCAN_FETCH_SIZE.defaultValue(),
null,
false,
SCAN_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@ -348,7 +309,7 @@ public class MySqlTableSourceFactoryTest {
// validate illegal server id range
try {
Map<String, String> properties = getAllOptions();
properties.put("snapshot.parallel-scan", "true");
properties.put("scan.snapshot.parallel-read", "true");
properties.put("server-id", "123");
createTableSource(properties);
@ -357,38 +318,7 @@ public class MySqlTableSourceFactoryTest {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The server id should be a range syntax like '5400,5404' when enable 'snapshot.parallel-scan' to 'true', but actual is 123")
.isPresent());
}
// validate split key when use combined primary key
try {
Map<String, String> properties = getAllOptions();
properties.put("snapshot.parallel-scan", "true");
properties.put("server-id", "123,126");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The 'scan.split.column' option is required if the primary key contains multiple fields")
.isPresent());
}
// validate split key must belong to primary key
try {
Map<String, String> properties = getAllOptions();
properties.put("snapshot.parallel-scan", "true");
properties.put("server-id", "123,126");
properties.put("scan.split.column", "ddd");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The 'scan.split.column' value ddd should be one field of the primary key [bbb, aaa], but it does not.")
"The 'server-id' should be a range syntax like '5400-5404' when enable 'scan.snapshot.parallel-read', but actual is 123")
.isPresent());
}
@ -447,6 +377,7 @@ public class MySqlTableSourceFactoryTest {
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);
options.put("password", MY_PASSWORD);
options.put("scan.snapshot.parallel-read", String.valueOf(false));
return options;
}

@ -145,9 +145,9 @@ VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
(404, 'KIND_008', 'user_5', 'leo list'),
(600, 'KIND_009', 'user_6', 'my shopping cart');
-- table has bigint unsigned primary key
-- table has bigint unsigned auto increment primary key
CREATE TABLE shopping_cart_big (
product_no BIGINT UNSIGNED NOT NULL,
product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
@ -155,9 +155,9 @@ CREATE TABLE shopping_cart_big (
);
insert into shopping_cart_big
VALUES (9223372036854773807, 'KIND_001', 'user_1', 'my shopping cart'),
(9223372036854774807, 'KIND_002', 'user_1', 'my shopping cart'),
(9223372036854775807, 'KIND_003', 'user_1', 'my shopping cart');
VALUES (default, 'KIND_001', 'user_1', 'my shopping cart'),
(default, 'KIND_002', 'user_1', 'my shopping cart'),
(default, 'KIND_003', 'user_1', 'my shopping cart');
-- table has decimal primary key
CREATE TABLE shopping_cart_dec (

Loading…
Cancel
Save