[mysql] Enable single server id for MySqlParallelSource

pull/287/head
Leonard Xu 4 years ago committed by Leonard Xu
parent e894b0a69e
commit 2edf6d3778

@ -48,6 +48,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Supplier;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME;
@ -112,9 +113,9 @@ public class MySqlParallelSource<T>
// set the server id for each reader, will used by debezium reader
Configuration readerConfiguration = config.clone();
readerConfiguration.removeConfig(MySqlSourceOptions.SERVER_ID);
readerConfiguration.setString(
DATABASE_SERVER_ID,
getServerIdForSubTask(config, readerContext.getIndexOfSubtask()));
final Optional<String> serverId =
getServerIdForSubTask(config, readerContext.getIndexOfSubtask());
serverId.ifPresent(s -> readerConfiguration.setString(DATABASE_SERVER_ID, s));
// set the DatabaseHistory name for each reader, will used by debezium reader
readerConfiguration.setString(
DATABASE_HISTORY_INSTANCE_NAME,

@ -25,6 +25,7 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
import java.time.Duration;
import java.util.Optional;
/** Configurations for {@link MySqlParallelSource}. */
public class MySqlSourceOptions {
@ -83,10 +84,10 @@ public class MySqlSourceOptions {
.withDescription(
"A numeric ID or a numeric ID range of this database client, "
+ "The numeric ID syntax is like '5400', the numeric ID range syntax "
+ "is like '5400-5408', The numeric ID range syntax is required when "
+ "is like '5400-5408', The numeric ID range syntax is recommended when "
+ "'scan.snapshot.parallel-read' enabled. Every ID must be unique across all "
+ "currently-running database processes in the MySQL cluster. This connector"
+ " joins the MySQL database cluster as another server (with this unique ID) "
+ " joins the MySQL cluster as another server (with this unique ID) "
+ "so it can read the binlog. By default, a random number is generated between"
+ " 5400 and 6400, though we recommend setting an explicit value.");
@ -96,7 +97,7 @@ public class MySqlSourceOptions {
.defaultValue(true)
.withDescription(
"Enable parallel read snapshot of table or not, false by default."
+ "The 'server-id' is required to be a range syntax like '5400,5408'.");
+ "The 'server-id' is required to be a range syntax like '5400-5408'.");
public static final ConfigOption<Integer> SCAN_SNAPSHOT_CHUNK_SIZE =
ConfigOptions.key("scan.snapshot.chunk.size")
@ -152,57 +153,69 @@ public class MySqlSourceOptions {
// utils
public static String validateAndGetServerId(ReadableConfig configuration) {
final String serverIdValue = configuration.get(MySqlSourceOptions.SERVER_ID);
// validate server id range
if (configuration.get(SCAN_SNAPSHOT_PARALLEL_READ)) {
String errMsg =
"The '%s' should be a range syntax like '5400-5404' when enable '%s', "
+ "but actual is %s";
Preconditions.checkState(
serverIdValue != null
&& serverIdValue.contains("-")
&& serverIdValue.split("-").length == 2,
String.format(
errMsg,
SERVER_ID.key(),
SCAN_SNAPSHOT_PARALLEL_READ.key(),
serverIdValue));
try {
Integer.parseInt(serverIdValue.split("-")[0].trim());
Integer.parseInt(serverIdValue.split("-")[1].trim());
} catch (NumberFormatException e) {
throw new IllegalStateException(String.format(errMsg, serverIdValue), e);
}
} else {
// validate single server id
try {
if (serverIdValue != null) {
Integer.parseInt(serverIdValue);
}
} catch (NumberFormatException e) {
throw new IllegalStateException(
if (serverIdValue != null) {
if (serverIdValue.contains("-")) {
String errMsg =
"The '%s' should be a range syntax like '5400-5404' when enable '%s', "
+ "but actual is %s";
Preconditions.checkState(
serverIdValue.split("-").length == 2,
String.format(
"The 'server.id' should contains single numeric ID, but is %s",
serverIdValue),
e);
errMsg,
SERVER_ID.key(),
SCAN_SNAPSHOT_PARALLEL_READ.key(),
serverIdValue));
checkServerId(serverIdValue.split("-")[0].trim());
checkServerId(serverIdValue.split("-")[1].trim());
} else {
checkServerId(serverIdValue);
}
}
return serverIdValue;
}
private static void checkServerId(String serverIdValue) {
try {
Integer.parseInt(serverIdValue);
} catch (NumberFormatException e) {
throw new IllegalStateException(
String.format(
"The 'server-id' should contains single numeric ID like '5400' or numeric ID range '5400-5404', but actual is %s",
serverIdValue),
e);
}
}
public static int getServerId(String serverIdValue) {
return Integer.parseInt(serverIdValue);
}
public static String getServerIdForSubTask(Configuration configuration, int subtaskId) {
public static Optional<String> getServerIdForSubTask(
Configuration configuration, int subtaskId) {
String serverIdRange = configuration.getString(MySqlSourceOptions.SERVER_ID);
int serverIdStart = Integer.parseInt(serverIdRange.split("-")[0].trim());
int serverIdEnd = Integer.parseInt(serverIdRange.split("-")[1].trim());
int serverId = serverIdStart + subtaskId;
Preconditions.checkState(
serverIdStart <= serverId && serverId <= serverIdEnd,
String.format(
"The server id %s in task %d is out of server id range %s, please keep the job parallelism same with server id num of server id range.",
serverId, subtaskId, serverIdRange));
return String.valueOf(serverId);
if (serverIdRange == null) {
return Optional.empty();
}
if (serverIdRange.contains("-")) {
int serverIdStart = Integer.parseInt(serverIdRange.split("-")[0].trim());
int serverIdEnd = Integer.parseInt(serverIdRange.split("-")[1].trim());
int serverId = serverIdStart + subtaskId;
Preconditions.checkState(
serverIdStart <= serverId && serverId <= serverIdEnd,
String.format(
"The server id %s in task %d is out of server id range %s, please keep the job parallelism same with server id num of server id range.",
serverId, subtaskId, serverIdRange));
return Optional.of(String.valueOf(serverId));
} else {
int serverIdStart = Integer.parseInt(serverIdRange);
if (subtaskId > 0) {
throw new IllegalStateException(
String.format(
"The server id should a range like '5400-5404' when %s enabled , but actual is %s",
SCAN_SNAPSHOT_PARALLEL_READ.key(), serverIdRange));
} else {
return Optional.of(String.valueOf(serverIdStart));
}
}
}
}

@ -30,6 +30,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import com.alibaba.ververica.cdc.connectors.mysql.MySqlSource;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
@ -53,6 +54,7 @@ import java.util.Properties;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_PARALLEL_READ;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_ID;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -130,7 +132,7 @@ public class MySqlTableSource implements ScanTableSource {
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
if (enableParallelRead) {
RowType pkRowType = getPkType(physicalSchema);
RowType pkRowType = validateAndGetPkType(physicalSchema);
Configuration configuration = getParallelSourceConf();
MySqlParallelSource<RowData> parallelSource =
new MySqlParallelSource<>(pkRowType, deserializer, configuration);
@ -156,7 +158,13 @@ public class MySqlTableSource implements ScanTableSource {
}
}
private RowType getPkType(TableSchema tableSchema) {
private RowType validateAndGetPkType(TableSchema tableSchema) {
Preconditions.checkState(
physicalSchema.getPrimaryKey().isPresent(),
String.format(
"The primary key is required when %s enabled, but actual is %s",
SCAN_SNAPSHOT_PARALLEL_READ.key(), physicalSchema.getPrimaryKey()));
List<String> pkFieldNames = physicalSchema.getPrimaryKey().get().getColumns();
LogicalType[] pkFieldTypes =
pkFieldNames.stream()
@ -187,7 +195,9 @@ public class MySqlTableSource implements ScanTableSource {
* The server id is required, it will be replaced to 'database.server.id' when build {@Link
* MySQLSplitReader}
*/
properties.put(SERVER_ID.key(), serverId);
if (serverId != null) {
properties.put(SERVER_ID.key(), serverId);
}
properties.put(SCAN_SNAPSHOT_CHUNK_SIZE.key(), String.valueOf(splitSize));
properties.put(SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize));
properties.put("connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));

@ -242,8 +242,8 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger {
private String getServerId() {
final Random random = new Random();
int serverIdStart = random.nextInt(100) + 5400;
return serverIdStart + "-" + (serverIdStart + PARALLELISM);
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + PARALLELISM);
}
private void sleepMs(long millis) {

@ -632,11 +632,11 @@ public class MySqlConnectorITCase extends MySqlTestBase {
private String getServerId() {
final Random random = new Random();
int serverIdStart = random.nextInt(100) + 5400;
int serverId = random.nextInt(100) + 5400;
if (parallelRead) {
return serverIdStart + "-" + (serverIdStart + env.getParallelism());
return serverId + "-" + (serverId + env.getParallelism());
}
return String.valueOf(serverIdStart);
return String.valueOf(serverId);
}
private int getSplitSize() {

@ -130,6 +130,37 @@ public class MySqlTableSourceFactoryTest {
assertEquals(expectedSource, actualSource);
}
@Test
public void testEnableParallelReadSourceWithSingleServerId() {
Map<String, String> properties = getAllOptions();
properties.put("scan.snapshot.parallel-read", "true");
properties.put("server-id", "123");
properties.put("scan.snapshot.chunk.size", "8000");
properties.put("scan.snapshot.fetch.size", "100");
properties.put("connect.timeout", "45s");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
"123",
true,
8000,
100,
Duration.ofSeconds(45),
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testEnableParallelReadSourceLatestOffset() {
Map<String, String> properties = getAllOptions();
@ -331,23 +362,7 @@ public class MySqlTableSourceFactoryTest {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The 'server.id' should contains single numeric ID, but is 123b")
.isPresent());
}
// validate illegal server id range
try {
Map<String, String> properties = getAllOptions();
properties.put("scan.snapshot.parallel-read", "true");
properties.put("server-id", "123");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The 'server-id' should be a range syntax like '5400-5404' when enable 'scan.snapshot.parallel-read', but actual is 123")
"The 'server-id' should contains single numeric ID like '5400' or numeric ID range '5400-5404', but actual is 123b")
.isPresent());
}

Loading…
Cancel
Save