diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index e2735c238..867508029 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.sql.SQLException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,7 +108,7 @@ class ChunkSplitter { "Split table {} into {} chunks, time cost: {}ms.", tableId, splits.size(), - Duration.ofMillis(end - start)); + end - start); return splits; } catch (Exception e) { throw new FlinkRuntimeException( diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java index cb45c5b4e..9b5a1d788 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java @@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.mysql.source.connection; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import io.debezium.connector.mysql.MySqlConnectorConfig; /** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */ public class PooledDataSourceFactory { @@ -45,6 +46,8 @@ public class PooledDataSourceFactory { config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize()); config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis()); config.addDataSourceProperty(SERVER_TIMEZONE_KEY, sourceConfig.getServerTimeZone()); + config.setDriverClassName( + sourceConfig.getDbzConfiguration().getString(MySqlConnectorConfig.JDBC_DRIVER)); // optional optimization configurations for pooled DataSource config.addDataSourceProperty("cachePrepStmts", "true");