diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java index 0ce8dc4ca..d0bf8bad6 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java @@ -99,7 +99,9 @@ public class SqlServerDialect implements JdbcDataSourceDialect { SqlServerSourceConfig sqlserverSourceConfig = (SqlServerSourceConfig) sourceConfig; try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) { return SqlServerConnectionUtils.listTables( - jdbcConnection, sqlserverSourceConfig.getTableFilters()); + jdbcConnection, + sqlserverSourceConfig.getTableFilters(), + sqlserverSourceConfig.getDatabaseList()); } catch (SQLException e) { throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e); } diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java index 2d52d5ad4..c689acba1 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java @@ -54,7 +54,8 @@ public class SqlServerConnectionUtils { false); } - public static List listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters) + public static List listTables( + JdbcConnection jdbc, RelationalTableFilters tableFilters, List databaseList) throws SQLException { final List capturedTableIds = new ArrayList<>(); // ------------------- @@ -68,7 +69,12 @@ public class SqlServerConnectionUtils { "SELECT name, database_id, create_date \n" + "FROM sys.databases; ", rs -> { while (rs.next()) { - databaseNames.add(rs.getString(1)); + // Because sqlserver table filter cannot filter by database name, we need to + // filter here + String databaseName = rs.getString(1); + if (databaseList.contains(databaseName)) { + databaseNames.add(databaseName); + } } }); LOG.info("\t list of available databases is: {}", databaseNames); diff --git a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java index 6539a11ac..891021376 100644 --- a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java +++ b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java @@ -84,6 +84,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { public void testConsumingAllEvents() throws SQLException, ExecutionException, InterruptedException { initializeSqlServerTable("inventory"); + initializeSqlServerTable("product"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -142,6 +143,9 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { "UPDATE inventory.dbo.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.dbo.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.dbo.products WHERE id=111;"); + + statement.execute( + "INSERT INTO product.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);"); } waitForSinkSize("sink", 20); diff --git a/flink-connector-sqlserver-cdc/src/test/resources/ddl/product.sql b/flink-connector-sqlserver-cdc/src/test/resources/ddl/product.sql new file mode 100644 index 000000000..4c309c594 --- /dev/null +++ b/flink-connector-sqlserver-cdc/src/test/resources/ddl/product.sql @@ -0,0 +1,30 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: product +-- ---------------------------------------------------------------------------------------------------------------- +-- Create the product database +CREATE DATABASE product; + +USE product; +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE products ( + id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +insert into products(name,description,weight)values ('scooter','Small 2-wheel scooter',99.8); +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;