From 5cf940d160b2e6f711236ea97a7c0a76b7593036 Mon Sep 17 00:00:00 2001 From: luoyuxia <1464567365@qq.com> Date: Thu, 28 Oct 2021 10:54:27 +0800 Subject: [PATCH] [mysql] Validate the regex pattern of database-name and table-name in table factory (#526) --- .../mysql/table/MySqlTableSourceFactory.java | 21 +++++++++++++++++ .../table/MySqlTableSourceFactoryTest.java | 23 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 73ee9d3ce..e0946f030 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -36,6 +36,7 @@ import java.time.Duration; import java.time.ZoneId; import java.util.HashSet; import java.util.Set; +import java.util.regex.Pattern; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.DATABASE_NAME; @@ -72,7 +73,9 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { String username = config.get(USERNAME); String password = config.get(PASSWORD); String databaseName = config.get(DATABASE_NAME); + validateRegex(DATABASE_NAME.key(), databaseName); String tableName = config.get(TABLE_NAME); + validateRegex(TABLE_NAME.key(), tableName); int port = config.get(PORT); int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); @@ -231,4 +234,22 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { "The value of option '%s' must larger than 1, but is %d", SCAN_SNAPSHOT_FETCH_SIZE.key(), fetchSize)); } + + /** + * Checks the given regular expression's syntax is valid. + * + * @param optionName the option name of the regex + * @param regex The regular expression to be checked + * @throws ValidationException If the expression's syntax is invalid + */ + private void validateRegex(String optionName, String regex) { + try { + Pattern.compile(regex); + } catch (Exception e) { + throw new ValidationException( + String.format( + "The %s '%s' is not a valid regular expression", optionName, regex), + e); + } + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index f13a66f01..7b6b69f63 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -477,6 +477,29 @@ public class MySqlTableSourceFactoryTest { + "but was: abc"; assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent()); } + + // validate invalid database-name + try { + Map properties = getAllOptions(); + properties.put("database-name", "*_invalid_db"); + } catch (Throwable t) { + String msg = + String.format( + "The database-name '%s' is not a valid regular expression", + "*_invalid_db"); + assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent()); + } + // validate invalid table-name + try { + Map properties = getAllOptions(); + properties.put("table-name", "*_invalid_table"); + } catch (Throwable t) { + String msg = + String.format( + "The table-name '%s' is not a valid regular expression", + "*_invalid_table"); + assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent()); + } } private Map getAllOptions() {