[hotfix] Fix sqlserver monitor same table in other database (#2335)

* [hotfix] Fix sqlserver monitor same table in other database

* Add unit test

---------

Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
pull/2787/head
gongzhongqiang 2 years ago committed by GitHub
parent 6dba62ece2
commit 823f5a1e21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -99,7 +99,9 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
SqlServerSourceConfig sqlserverSourceConfig = (SqlServerSourceConfig) sourceConfig;
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return SqlServerConnectionUtils.listTables(
jdbcConnection, sqlserverSourceConfig.getTableFilters());
jdbcConnection,
sqlserverSourceConfig.getTableFilters(),
sqlserverSourceConfig.getDatabaseList());
} catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
}

@ -54,7 +54,8 @@ public class SqlServerConnectionUtils {
false);
}
public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters)
public static List<TableId> listTables(
JdbcConnection jdbc, RelationalTableFilters tableFilters, List<String> databaseList)
throws SQLException {
final List<TableId> capturedTableIds = new ArrayList<>();
// -------------------
@ -68,7 +69,12 @@ public class SqlServerConnectionUtils {
"SELECT name, database_id, create_date \n" + "FROM sys.databases; ",
rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
// Because sqlserver table filter cannot filter by database name, we need to
// filter here
String databaseName = rs.getString(1);
if (databaseList.contains(databaseName)) {
databaseNames.add(databaseName);
}
}
});
LOG.info("\t list of available databases is: {}", databaseNames);

@ -84,6 +84,7 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
initializeSqlServerTable("inventory");
initializeSqlServerTable("product");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -142,6 +143,9 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
"UPDATE inventory.dbo.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.dbo.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.dbo.products WHERE id=111;");
statement.execute(
"INSERT INTO product.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
}
waitForSinkSize("sink", 20);

@ -0,0 +1,30 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: product
-- ----------------------------------------------------------------------------------------------------------------
-- Create the product database
CREATE DATABASE product;
USE product;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE products (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
insert into products(name,description,weight)values ('scooter','Small 2-wheel scooter',99.8);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
Loading…
Cancel
Save