diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index 962bf62a3..a656b5e0f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -57,7 +57,10 @@ public class TableDiscoveryUtils { "SHOW DATABASES", rs -> { while (rs.next()) { - databaseNames.add(rs.getString(1)); + String databaseName = rs.getString(1); + if (tableFilters.databaseFilter().test(databaseName)) { + databaseNames.add(databaseName); + } } }); LOG.info("\t list of available databases is: {}", databaseNames); @@ -79,9 +82,11 @@ public class TableDiscoveryUtils { TableId tableId = new TableId(dbName, null, rs.getString(1)); if (tableFilters.dataCollectionFilter().isIncluded(tableId)) { capturedTableIds.add(tableId); - LOG.info("\t including '{}' for further processing", tableId); + LOG.info( + "\t including table '{}' for further processing", + tableId); } else { - LOG.info("\t '{}' is filtered out of capturing", tableId); + LOG.info("\t '{}' is filtered out of table capturing", tableId); } } });