diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java index 1975f9577..4c56049f3 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java @@ -20,7 +20,6 @@ import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.options.StartupOptions; import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.SqlServerConnectorConfig; -import io.debezium.relational.RelationalTableFilters; import java.time.Duration; import java.util.List; @@ -85,12 +84,4 @@ public class SqlServerSourceConfig extends JdbcSourceConfig { public SqlServerConnectorConfig getDbzConnectorConfig() { return new SqlServerConnectorConfig(getDbzConfiguration()); } - - public Configuration getOriginDbzConnectorConfig() { - return super.getDbzConfiguration(); - } - - public RelationalTableFilters getTableFilters() { - return getDbzConnectorConfig().getTableFilters(); - } } diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java index 00fe53e5f..994239585 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -21,8 +21,6 @@ import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.SqlServerConnector; -import java.util.Arrays; -import java.util.List; import java.util.Properties; import java.util.UUID; @@ -34,17 +32,6 @@ public class SqlServerSourceConfigFactory extends JdbcSourceConfigFactory { private static final String DATABASE_SERVER_NAME = "sqlserver_transaction_log_source"; private static final String DRIVER_ClASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; - private List schemaList; - - /** - * An optional list of regular expressions that match schema names to be monitored; any schema - * name not included in the whitelist will be excluded from monitoring. By default, all - * non-system schemas will be monitored. - */ - public JdbcSourceConfigFactory schemaList(String... schemaList) { - this.schemaList = Arrays.asList(schemaList); - return this; - } @Override public SqlServerSourceConfig create(int subtask) { diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java index d0bf8bad6..0861324a0 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerDialect.java @@ -50,12 +50,10 @@ import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils public class SqlServerDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; - private final SqlServerSourceConfigFactory configFactory; private final SqlServerSourceConfig sourceConfig; private transient SqlServerSchema sqlserverSchema; public SqlServerDialect(SqlServerSourceConfigFactory configFactory) { - this.configFactory = configFactory; this.sourceConfig = configFactory.create(0); } @@ -75,7 +73,6 @@ public class SqlServerDialect implements JdbcDataSourceDialect { @Override public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { - // todo: need to check the case sensitive of the database return true; } @@ -100,7 +97,7 @@ public class SqlServerDialect implements JdbcDataSourceDialect { try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) { return SqlServerConnectionUtils.listTables( jdbcConnection, - sqlserverSourceConfig.getTableFilters(), + sqlserverSourceConfig.getDbzConnectorConfig().getTableFilters(), sqlserverSourceConfig.getDatabaseList()); } catch (SQLException e) { throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e); @@ -131,7 +128,10 @@ public class SqlServerDialect implements JdbcDataSourceDialect { if (sqlserverSchema == null) { sqlserverSchema = new SqlServerSchema(); } - return sqlserverSchema.getTableSchema(jdbc, tableId); + return sqlserverSchema.getTableSchema( + jdbc, + tableId, + sourceConfig.getDbzConnectorConfig().getTableFilters().dataCollectionFilter()); } @Override diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerSchema.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerSchema.java index df83699c7..158a881aa 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerSchema.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerSchema.java @@ -28,9 +28,7 @@ import io.debezium.relational.history.TableChanges.TableChange; import java.sql.SQLException; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** A component used to get schema by table path. */ @@ -42,20 +40,20 @@ public class SqlServerSchema { this.schemasByTableId = new ConcurrentHashMap<>(); } - public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) { + public TableChange getTableSchema( + JdbcConnection jdbc, TableId tableId, Tables.TableFilter tableFilters) { // read schema from cache first TableChange schema = schemasByTableId.get(tableId); if (schema == null) { - schema = readTableSchema(jdbc, tableId); + schema = readTableSchema(jdbc, tableId, tableFilters); schemasByTableId.put(tableId, schema); } return schema; } - private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) { + private TableChange readTableSchema( + JdbcConnection jdbc, TableId tableId, Tables.TableFilter tableFilters) { SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc; - Set tableIdSet = new HashSet<>(); - tableIdSet.add(tableId); final Map tableChangeMap = new HashMap<>(); Tables tables = new Tables(); @@ -63,7 +61,7 @@ public class SqlServerSchema { try { sqlServerConnection.readSchema( - tables, tableId.catalog(), tableId.schema(), null, null, false); + tables, tableId.catalog(), tableId.schema(), tableFilters, null, false); Table table = tables.forTable(tableId); TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table); tableChangeMap.put(tableId, tableChange);