diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index 735bce3e0..4e34d4da5 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -324,7 +324,7 @@ Flink SQL> SELECT * FROM orders;
optional |
30s |
Duration |
- 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 |
+ 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。 |
connect.max-retries |
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 9f9465ed7..fa6056cd9 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -215,7 +215,7 @@ pipeline:
optional |
30s |
Duration |
- 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 |
+ 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。 |
connect.max-retries |
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 6e7f2278d..5c2cbe546 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -322,7 +322,8 @@ During a snapshot operation, the connector will query each included table to pro
optional |
30s |
Duration |
- The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. |
+ The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
+ This value cannot be less than 250ms. |
connect.max-retries |
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 30feb47f7..9eef3d7a4 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -218,7 +218,8 @@ pipeline:
optional |
30s |
Duration |
- The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. |
+ The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
+ This value cannot be less than 250ms. |
connect.max-retries |
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index c00f639af..1a1c3f254 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -119,7 +119,7 @@ public class MySqlSourceOptions {
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
- "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.");
+ "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms.");
public static final ConfigOption CONNECTION_POOL_SIZE =
ConfigOptions.key("connection.pool.size")
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index ae8caf285..e435f946a 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -113,6 +113,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
validateIntegerOption(MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
+ validateDurationOption(
+ MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
}
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
@@ -316,6 +318,16 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
option.key(), exclusiveMin, optionValue));
}
+ /** Checks the value of given duration option is valid. */
+ private void validateDurationOption(
+ ConfigOption option, Duration optionValue, Duration exclusiveMin) {
+ checkState(
+ optionValue.toMillis() > exclusiveMin.toMillis(),
+ String.format(
+ "The value of option '%s' cannot be less than %s, but actual is %s",
+ option.key(), exclusiveMin, optionValue));
+ }
+
/**
* Checks the given regular expression's syntax is valid.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 0f89a0b8d..f0f477897 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -590,6 +590,22 @@ public class MySqlTableSourceFactoryTest {
.isPresent());
}
+ // validate illegal connect.timeout
+ try {
+ Map properties = getAllOptions();
+ properties.put("scan.incremental.snapshot.enabled", "true");
+ properties.put("connect.timeout", "240ms");
+
+ createTableSource(properties);
+ fail("exception expected");
+ } catch (Throwable t) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(
+ t,
+ "The value of option 'connect.timeout' cannot be less than PT0.25S, but actual is PT0.24S")
+ .isPresent());
+ }
+
// validate illegal split size
try {
Map properties = getAllOptions();