[sqlserver] Add table filter to speed up SqlServerSchema read (#2369)

Co-authored-by: pgm-rookie <669768751@qq.com>
pull/2416/head
gongzhongqiang 1 year ago committed by GitHub
parent 420fe11c2f
commit 54df7e93f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,7 +20,6 @@ import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.options.StartupOptions;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig; import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
@ -85,12 +84,4 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
public SqlServerConnectorConfig getDbzConnectorConfig() { public SqlServerConnectorConfig getDbzConnectorConfig() {
return new SqlServerConnectorConfig(getDbzConfiguration()); return new SqlServerConnectorConfig(getDbzConfiguration());
} }
public Configuration getOriginDbzConnectorConfig() {
return super.getDbzConfiguration();
}
public RelationalTableFilters getTableFilters() {
return getDbzConnectorConfig().getTableFilters();
}
} }

@ -21,8 +21,6 @@ import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnector; import io.debezium.connector.sqlserver.SqlServerConnector;
import java.util.Arrays;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
@ -34,17 +32,6 @@ public class SqlServerSourceConfigFactory extends JdbcSourceConfigFactory {
private static final String DATABASE_SERVER_NAME = "sqlserver_transaction_log_source"; private static final String DATABASE_SERVER_NAME = "sqlserver_transaction_log_source";
private static final String DRIVER_ClASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; private static final String DRIVER_ClASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
private List<String> schemaList;
/**
* An optional list of regular expressions that match schema names to be monitored; any schema
* name not included in the whitelist will be excluded from monitoring. By default, all
* non-system schemas will be monitored.
*/
public JdbcSourceConfigFactory schemaList(String... schemaList) {
this.schemaList = Arrays.asList(schemaList);
return this;
}
@Override @Override
public SqlServerSourceConfig create(int subtask) { public SqlServerSourceConfig create(int subtask) {

@ -50,12 +50,10 @@ import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils
public class SqlServerDialect implements JdbcDataSourceDialect { public class SqlServerDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final SqlServerSourceConfigFactory configFactory;
private final SqlServerSourceConfig sourceConfig; private final SqlServerSourceConfig sourceConfig;
private transient SqlServerSchema sqlserverSchema; private transient SqlServerSchema sqlserverSchema;
public SqlServerDialect(SqlServerSourceConfigFactory configFactory) { public SqlServerDialect(SqlServerSourceConfigFactory configFactory) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.create(0); this.sourceConfig = configFactory.create(0);
} }
@ -75,7 +73,6 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
@Override @Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
// todo: need to check the case sensitive of the database
return true; return true;
} }
@ -100,7 +97,7 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) { try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return SqlServerConnectionUtils.listTables( return SqlServerConnectionUtils.listTables(
jdbcConnection, jdbcConnection,
sqlserverSourceConfig.getTableFilters(), sqlserverSourceConfig.getDbzConnectorConfig().getTableFilters(),
sqlserverSourceConfig.getDatabaseList()); sqlserverSourceConfig.getDatabaseList());
} catch (SQLException e) { } catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e); throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
@ -131,7 +128,10 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
if (sqlserverSchema == null) { if (sqlserverSchema == null) {
sqlserverSchema = new SqlServerSchema(); sqlserverSchema = new SqlServerSchema();
} }
return sqlserverSchema.getTableSchema(jdbc, tableId); return sqlserverSchema.getTableSchema(
jdbc,
tableId,
sourceConfig.getDbzConnectorConfig().getTableFilters().dataCollectionFilter());
} }
@Override @Override

@ -28,9 +28,7 @@ import io.debezium.relational.history.TableChanges.TableChange;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** A component used to get schema by table path. */ /** A component used to get schema by table path. */
@ -42,20 +40,20 @@ public class SqlServerSchema {
this.schemasByTableId = new ConcurrentHashMap<>(); this.schemasByTableId = new ConcurrentHashMap<>();
} }
public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) { public TableChange getTableSchema(
JdbcConnection jdbc, TableId tableId, Tables.TableFilter tableFilters) {
// read schema from cache first // read schema from cache first
TableChange schema = schemasByTableId.get(tableId); TableChange schema = schemasByTableId.get(tableId);
if (schema == null) { if (schema == null) {
schema = readTableSchema(jdbc, tableId); schema = readTableSchema(jdbc, tableId, tableFilters);
schemasByTableId.put(tableId, schema); schemasByTableId.put(tableId, schema);
} }
return schema; return schema;
} }
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) { private TableChange readTableSchema(
JdbcConnection jdbc, TableId tableId, Tables.TableFilter tableFilters) {
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc; SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
Set<TableId> tableIdSet = new HashSet<>();
tableIdSet.add(tableId);
final Map<TableId, TableChange> tableChangeMap = new HashMap<>(); final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables(); Tables tables = new Tables();
@ -63,7 +61,7 @@ public class SqlServerSchema {
try { try {
sqlServerConnection.readSchema( sqlServerConnection.readSchema(
tables, tableId.catalog(), tableId.schema(), null, null, false); tables, tableId.catalog(), tableId.schema(), tableFilters, null, false);
Table table = tables.forTable(tableId); Table table = tables.forTable(tableId);
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table); TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange); tableChangeMap.put(tableId, tableChange);

Loading…
Cancel
Save