diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 3072b86a0..9923226d1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -335,6 +335,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { options.add(METADATA_LIST); options.add(INCLUDE_COMMENTS_ENABLED); options.add(USE_LEGACY_JSON_FORMAT); + options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 277bb74ac..de49c7b7e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -43,6 +43,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; @@ -245,6 +246,28 @@ public class MySqlDataSourceFactoryTest extends MySqlSourceTestBase { + "unsupported_key"); } + @Test + public void testOptionalOption() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + + // optional option + options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false"); + + Factory.Context context = new MockContext(Configuration.fromMap(options)); + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED)) + .isEqualTo(true); + + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false); + } + @Test public void testPrefixRequireOption() { inventoryDatabase.createAndInitialize();