|
|
|
@ -43,6 +43,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
|
|
|
|
@ -245,6 +246,28 @@ public class MySqlDataSourceFactoryTest extends MySqlSourceTestBase {
|
|
|
|
|
+ "unsupported_key");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testOptionalOption() {
|
|
|
|
|
inventoryDatabase.createAndInitialize();
|
|
|
|
|
Map<String, String> options = new HashMap<>();
|
|
|
|
|
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
|
|
|
|
|
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
|
|
|
|
|
options.put(USERNAME.key(), TEST_USER);
|
|
|
|
|
options.put(PASSWORD.key(), TEST_PASSWORD);
|
|
|
|
|
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
|
|
|
|
|
|
|
|
|
|
// optional option
|
|
|
|
|
options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
|
|
|
|
|
|
|
|
|
|
Factory.Context context = new MockContext(Configuration.fromMap(options));
|
|
|
|
|
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
|
|
|
|
|
assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED))
|
|
|
|
|
.isEqualTo(true);
|
|
|
|
|
|
|
|
|
|
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
|
|
|
|
|
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testPrefixRequireOption() {
|
|
|
|
|
inventoryDatabase.createAndInitialize();
|
|
|
|
|