From 2edf6d37782ed190ed372e39b1de9e390287887f Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 28 Jul 2021 18:13:52 +0800 Subject: [PATCH] [mysql] Enable single server id for MySqlParallelSource --- .../mysql/source/MySqlParallelSource.java | 7 +- .../mysql/source/MySqlSourceOptions.java | 101 ++++++++++-------- .../mysql/table/MySqlTableSource.java | 16 ++- .../source/MySqlParallelSourceTestBase.java | 4 +- .../mysql/table/MySqlConnectorITCase.java | 6 +- .../table/MySqlTableSourceFactoryTest.java | 49 ++++++--- 6 files changed, 111 insertions(+), 72 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java index 35b95cf03..ca8d29478 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java @@ -48,6 +48,7 @@ import org.apache.kafka.connect.source.SourceRecord; import java.util.ArrayList; import java.util.HashMap; +import java.util.Optional; import java.util.function.Supplier; import static com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME; @@ -112,9 +113,9 @@ public class MySqlParallelSource // set the server id for each reader, will used by debezium reader Configuration readerConfiguration = config.clone(); readerConfiguration.removeConfig(MySqlSourceOptions.SERVER_ID); - readerConfiguration.setString( - DATABASE_SERVER_ID, - getServerIdForSubTask(config, readerContext.getIndexOfSubtask())); + final Optional serverId = + getServerIdForSubTask(config, readerContext.getIndexOfSubtask()); + serverId.ifPresent(s -> readerConfiguration.setString(DATABASE_SERVER_ID, s)); // set the DatabaseHistory name for each reader, will used by debezium reader readerConfiguration.setString( DATABASE_HISTORY_INSTANCE_NAME, diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java index d19127c21..7dc9820db 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.Preconditions; import java.time.Duration; +import java.util.Optional; /** Configurations for {@link MySqlParallelSource}. */ public class MySqlSourceOptions { @@ -83,10 +84,10 @@ public class MySqlSourceOptions { .withDescription( "A numeric ID or a numeric ID range of this database client, " + "The numeric ID syntax is like '5400', the numeric ID range syntax " - + "is like '5400-5408', The numeric ID range syntax is required when " + + "is like '5400-5408', The numeric ID range syntax is recommended when " + "'scan.snapshot.parallel-read' enabled. Every ID must be unique across all " + "currently-running database processes in the MySQL cluster. This connector" - + " joins the MySQL database cluster as another server (with this unique ID) " + + " joins the MySQL cluster as another server (with this unique ID) " + "so it can read the binlog. By default, a random number is generated between" + " 5400 and 6400, though we recommend setting an explicit value."); @@ -96,7 +97,7 @@ public class MySqlSourceOptions { .defaultValue(true) .withDescription( "Enable parallel read snapshot of table or not, false by default." - + "The 'server-id' is required to be a range syntax like '5400,5408'."); + + "The 'server-id' is required to be a range syntax like '5400-5408'."); public static final ConfigOption SCAN_SNAPSHOT_CHUNK_SIZE = ConfigOptions.key("scan.snapshot.chunk.size") @@ -152,57 +153,69 @@ public class MySqlSourceOptions { // utils public static String validateAndGetServerId(ReadableConfig configuration) { final String serverIdValue = configuration.get(MySqlSourceOptions.SERVER_ID); - // validate server id range - if (configuration.get(SCAN_SNAPSHOT_PARALLEL_READ)) { - String errMsg = - "The '%s' should be a range syntax like '5400-5404' when enable '%s', " - + "but actual is %s"; - Preconditions.checkState( - serverIdValue != null - && serverIdValue.contains("-") - && serverIdValue.split("-").length == 2, - String.format( - errMsg, - SERVER_ID.key(), - SCAN_SNAPSHOT_PARALLEL_READ.key(), - serverIdValue)); - try { - Integer.parseInt(serverIdValue.split("-")[0].trim()); - Integer.parseInt(serverIdValue.split("-")[1].trim()); - } catch (NumberFormatException e) { - throw new IllegalStateException(String.format(errMsg, serverIdValue), e); - } - } else { - // validate single server id - try { - if (serverIdValue != null) { - Integer.parseInt(serverIdValue); - } - } catch (NumberFormatException e) { - throw new IllegalStateException( + if (serverIdValue != null) { + if (serverIdValue.contains("-")) { + String errMsg = + "The '%s' should be a range syntax like '5400-5404' when enable '%s', " + + "but actual is %s"; + Preconditions.checkState( + serverIdValue.split("-").length == 2, String.format( - "The 'server.id' should contains single numeric ID, but is %s", - serverIdValue), - e); + errMsg, + SERVER_ID.key(), + SCAN_SNAPSHOT_PARALLEL_READ.key(), + serverIdValue)); + checkServerId(serverIdValue.split("-")[0].trim()); + checkServerId(serverIdValue.split("-")[1].trim()); + } else { + checkServerId(serverIdValue); } } return serverIdValue; } + private static void checkServerId(String serverIdValue) { + try { + Integer.parseInt(serverIdValue); + } catch (NumberFormatException e) { + throw new IllegalStateException( + String.format( + "The 'server-id' should contains single numeric ID like '5400' or numeric ID range '5400-5404', but actual is %s", + serverIdValue), + e); + } + } + public static int getServerId(String serverIdValue) { return Integer.parseInt(serverIdValue); } - public static String getServerIdForSubTask(Configuration configuration, int subtaskId) { + public static Optional getServerIdForSubTask( + Configuration configuration, int subtaskId) { String serverIdRange = configuration.getString(MySqlSourceOptions.SERVER_ID); - int serverIdStart = Integer.parseInt(serverIdRange.split("-")[0].trim()); - int serverIdEnd = Integer.parseInt(serverIdRange.split("-")[1].trim()); - int serverId = serverIdStart + subtaskId; - Preconditions.checkState( - serverIdStart <= serverId && serverId <= serverIdEnd, - String.format( - "The server id %s in task %d is out of server id range %s, please keep the job parallelism same with server id num of server id range.", - serverId, subtaskId, serverIdRange)); - return String.valueOf(serverId); + if (serverIdRange == null) { + return Optional.empty(); + } + if (serverIdRange.contains("-")) { + int serverIdStart = Integer.parseInt(serverIdRange.split("-")[0].trim()); + int serverIdEnd = Integer.parseInt(serverIdRange.split("-")[1].trim()); + int serverId = serverIdStart + subtaskId; + Preconditions.checkState( + serverIdStart <= serverId && serverId <= serverIdEnd, + String.format( + "The server id %s in task %d is out of server id range %s, please keep the job parallelism same with server id num of server id range.", + serverId, subtaskId, serverIdRange)); + return Optional.of(String.valueOf(serverId)); + } else { + int serverIdStart = Integer.parseInt(serverIdRange); + if (subtaskId > 0) { + throw new IllegalStateException( + String.format( + "The server id should a range like '5400-5404' when %s enabled , but actual is %s", + SCAN_SNAPSHOT_PARALLEL_READ.key(), serverIdRange)); + } else { + return Optional.of(String.valueOf(serverIdStart)); + } + } } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 2bd1fce40..9b975a4b2 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -30,6 +30,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; import com.alibaba.ververica.cdc.connectors.mysql.MySqlSource; import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; @@ -53,6 +54,7 @@ import java.util.Properties; import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME; import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE; import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_PARALLEL_READ; import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_ID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -130,7 +132,7 @@ public class MySqlTableSource implements ScanTableSource { new RowDataDebeziumDeserializeSchema( rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone); if (enableParallelRead) { - RowType pkRowType = getPkType(physicalSchema); + RowType pkRowType = validateAndGetPkType(physicalSchema); Configuration configuration = getParallelSourceConf(); MySqlParallelSource parallelSource = new MySqlParallelSource<>(pkRowType, deserializer, configuration); @@ -156,7 +158,13 @@ public class MySqlTableSource implements ScanTableSource { } } - private RowType getPkType(TableSchema tableSchema) { + private RowType validateAndGetPkType(TableSchema tableSchema) { + Preconditions.checkState( + physicalSchema.getPrimaryKey().isPresent(), + String.format( + "The primary key is required when %s enabled, but actual is %s", + SCAN_SNAPSHOT_PARALLEL_READ.key(), physicalSchema.getPrimaryKey())); + List pkFieldNames = physicalSchema.getPrimaryKey().get().getColumns(); LogicalType[] pkFieldTypes = pkFieldNames.stream() @@ -187,7 +195,9 @@ public class MySqlTableSource implements ScanTableSource { * The server id is required, it will be replaced to 'database.server.id' when build {@Link * MySQLSplitReader} */ - properties.put(SERVER_ID.key(), serverId); + if (serverId != null) { + properties.put(SERVER_ID.key(), serverId); + } properties.put(SCAN_SNAPSHOT_CHUNK_SIZE.key(), String.valueOf(splitSize)); properties.put(SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize)); properties.put("connect.timeout.ms", String.valueOf(connectTimeout.toMillis())); diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java index d86a248d0..5d1139674 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java @@ -242,8 +242,8 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger { private String getServerId() { final Random random = new Random(); - int serverIdStart = random.nextInt(100) + 5400; - return serverIdStart + "-" + (serverIdStart + PARALLELISM); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + PARALLELISM); } private void sleepMs(long millis) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 8efe84379..f863c7f6a 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -632,11 +632,11 @@ public class MySqlConnectorITCase extends MySqlTestBase { private String getServerId() { final Random random = new Random(); - int serverIdStart = random.nextInt(100) + 5400; + int serverId = random.nextInt(100) + 5400; if (parallelRead) { - return serverIdStart + "-" + (serverIdStart + env.getParallelism()); + return serverId + "-" + (serverId + env.getParallelism()); } - return String.valueOf(serverIdStart); + return String.valueOf(serverId); } private int getSplitSize() { diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index e5af8aef3..ae5f29581 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -130,6 +130,37 @@ public class MySqlTableSourceFactoryTest { assertEquals(expectedSource, actualSource); } + @Test + public void testEnableParallelReadSourceWithSingleServerId() { + Map properties = getAllOptions(); + properties.put("scan.snapshot.parallel-read", "true"); + properties.put("server-id", "123"); + properties.put("scan.snapshot.chunk.size", "8000"); + properties.put("scan.snapshot.fetch.size", "100"); + properties.put("connect.timeout", "45s"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + MySqlTableSource expectedSource = + new MySqlTableSource( + TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + "123", + true, + 8000, + 100, + Duration.ofSeconds(45), + StartupOptions.initial()); + assertEquals(expectedSource, actualSource); + } + @Test public void testEnableParallelReadSourceLatestOffset() { Map properties = getAllOptions(); @@ -331,23 +362,7 @@ public class MySqlTableSourceFactoryTest { assertTrue( ExceptionUtils.findThrowableWithMessage( t, - "The 'server.id' should contains single numeric ID, but is 123b") - .isPresent()); - } - - // validate illegal server id range - try { - Map properties = getAllOptions(); - properties.put("scan.snapshot.parallel-read", "true"); - properties.put("server-id", "123"); - - createTableSource(properties); - fail("exception expected"); - } catch (Throwable t) { - assertTrue( - ExceptionUtils.findThrowableWithMessage( - t, - "The 'server-id' should be a range syntax like '5400-5404' when enable 'scan.snapshot.parallel-read', but actual is 123") + "The 'server-id' should contains single numeric ID like '5400' or numeric ID range '5400-5404', but actual is 123b") .isPresent()); }