[FLINK-35297][mysql] Add validation for option connect.timeout because of HikariConfig limitation (#3295)

pull/3419/head
Shawn Huang 8 months ago committed by GitHub
parent 2bd2e4ce24
commit 80920aaa28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -324,7 +324,7 @@ Flink SQL> SELECT * FROM orders;
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。</td>
</tr>
<tr>
<td>connect.max-retries</td>

@ -215,7 +215,7 @@ pipeline:
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。</td>
<td>连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。</td>
</tr>
<tr>
<td>connect.max-retries</td>

@ -322,7 +322,8 @@ During a snapshot operation, the connector will query each included table to pro
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
This value cannot be less than 250ms.</td>
</tr>
<tr>
<td>connect.max-retries</td>

@ -218,7 +218,8 @@ pipeline:
<td>optional</td>
<td style="word-wrap: break-word;">30s</td>
<td>Duration</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.</td>
<td>The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
This value cannot be less than 250ms.</td>
</tr>
<tr>
<td>connect.max-retries</td>

@ -119,7 +119,7 @@ public class MySqlSourceOptions {
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.");
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. This value cannot be less than 250ms.");
public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
ConfigOptions.key("connection.pool.size")

@ -113,6 +113,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
validateIntegerOption(MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
validateDurationOption(
MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
}
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
@ -316,6 +318,16 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
option.key(), exclusiveMin, optionValue));
}
/** Checks the value of given duration option is valid. */
private void validateDurationOption(
ConfigOption<Duration> option, Duration optionValue, Duration exclusiveMin) {
checkState(
optionValue.toMillis() > exclusiveMin.toMillis(),
String.format(
"The value of option '%s' cannot be less than %s, but actual is %s",
option.key(), exclusiveMin, optionValue));
}
/**
* Checks the given regular expression's syntax is valid.
*

@ -590,6 +590,22 @@ public class MySqlTableSourceFactoryTest {
.isPresent());
}
// validate illegal connect.timeout
try {
Map<String, String> properties = getAllOptions();
properties.put("scan.incremental.snapshot.enabled", "true");
properties.put("connect.timeout", "240ms");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"The value of option 'connect.timeout' cannot be less than PT0.25S, but actual is PT0.24S")
.isPresent());
}
// validate illegal split size
try {
Map<String, String> properties = getAllOptions();

Loading…
Cancel
Save