From 80920aaa28608eb107ad61c071199032e6bd2777 Mon Sep 17 00:00:00 2001 From: Shawn Huang Date: Mon, 17 Jun 2024 17:45:53 +0800 Subject: [PATCH] [FLINK-35297][mysql] Add validation for option connect.timeout because of HikariConfig limitation (#3295) --- .../docs/connectors/flink-sources/mysql-cdc.md | 2 +- .../docs/connectors/pipeline-connectors/mysql.md | 2 +- .../docs/connectors/flink-sources/mysql-cdc.md | 3 ++- .../docs/connectors/pipeline-connectors/mysql.md | 3 ++- .../mysql/source/config/MySqlSourceOptions.java | 2 +- .../mysql/table/MySqlTableSourceFactory.java | 12 ++++++++++++ .../mysql/table/MySqlTableSourceFactoryTest.java | 16 ++++++++++++++++ 7 files changed, 35 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 735bce3e0..4e34d4da5 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -324,7 +324,7 @@ Flink SQL> SELECT * FROM orders; optional 30s Duration - 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 + 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。 connect.max-retries diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 9f9465ed7..fa6056cd9 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -215,7 +215,7 @@ pipeline: optional 30s Duration - 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 + 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。 connect.max-retries diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 6e7f2278d..5c2cbe546 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -322,7 +322,8 @@ During a snapshot operation, the connector will query each included table to pro optional 30s Duration - The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. + The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. + This value cannot be less than 250ms. connect.max-retries diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 30feb47f7..9eef3d7a4 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -218,7 +218,8 @@ pipeline: optional 30s Duration - The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. + The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. + This value cannot be less than 250ms. connect.max-retries diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index c00f639af..1a1c3f254 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -119,7 +119,7 @@ public class MySqlSourceOptions { .durationType() .defaultValue(Duration.ofSeconds(30)) .withDescription( - "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out."); + "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms."); public static final ConfigOption CONNECTION_POOL_SIZE = ConfigOptions.key("connection.pool.size") diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index ae8caf285..e435f946a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -113,6 +113,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { validateIntegerOption(MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0); validateDistributionFactorUpper(distributionFactorUpper); validateDistributionFactorLower(distributionFactorLower); + validateDurationOption( + MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250)); } OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap()); @@ -316,6 +318,16 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { option.key(), exclusiveMin, optionValue)); } + /** Checks the value of given duration option is valid. */ + private void validateDurationOption( + ConfigOption option, Duration optionValue, Duration exclusiveMin) { + checkState( + optionValue.toMillis() > exclusiveMin.toMillis(), + String.format( + "The value of option '%s' cannot be less than %s, but actual is %s", + option.key(), exclusiveMin, optionValue)); + } + /** * Checks the given regular expression's syntax is valid. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 0f89a0b8d..f0f477897 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -590,6 +590,22 @@ public class MySqlTableSourceFactoryTest { .isPresent()); } + // validate illegal connect.timeout + try { + Map properties = getAllOptions(); + properties.put("scan.incremental.snapshot.enabled", "true"); + properties.put("connect.timeout", "240ms"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + "The value of option 'connect.timeout' cannot be less than PT0.25S, but actual is PT0.24S") + .isPresent()); + } + // validate illegal split size try { Map properties = getAllOptions();