diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java index b6ea1db73..ae1b300d7 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java @@ -27,10 +27,12 @@ public class ConnectionPoolId implements Serializable { private static final long serialVersionUID = 1L; private final String host; private final int port; + private final String username; - public ConnectionPoolId(String host, int port) { + public ConnectionPoolId(String host, int port, String username) { this.host = host; this.port = port; + this.username = username; } @Override @@ -42,16 +44,18 @@ public class ConnectionPoolId implements Serializable { return false; } ConnectionPoolId that = (ConnectionPoolId) o; - return Objects.equals(host, that.host) && Objects.equals(port, that.port); + return Objects.equals(host, that.host) + && Objects.equals(port, that.port) + && Objects.equals(username, that.username); } @Override public int hashCode() { - return Objects.hash(host, port); + return Objects.hash(host, port, username); } @Override public String toString() { - return host + ':' + port; + return username + '@' + host + ':' + port; } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java index 8519099ac..ecbd0df9b 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java @@ -49,7 +49,10 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory { final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); final ConnectionPoolId connectionPoolId = - new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort()); + new ConnectionPoolId( + sourceConfig.getHostname(), + sourceConfig.getPort(), + sourceConfig.getUsername()); HikariDataSource dataSource = JdbcConnectionPools.getInstance(jdbcConnectionPoolFactory) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java index 82d540e34..d339a36e8 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java @@ -27,10 +27,12 @@ public class ConnectionPoolId implements Serializable { private static final long serialVersionUID = 1L; private final String host; private final int port; + private final String username; - public ConnectionPoolId(String host, int port) { + public ConnectionPoolId(String host, int port, String username) { this.host = host; this.port = port; + this.username = username; } @Override @@ -42,16 +44,18 @@ public class ConnectionPoolId implements Serializable { return false; } ConnectionPoolId that = (ConnectionPoolId) o; - return Objects.equals(host, that.host) && Objects.equals(port, that.port); + return Objects.equals(host, that.host) + && Objects.equals(port, that.port) + && Objects.equals(username, that.username); } @Override public int hashCode() { - return Objects.hash(host, port); + return Objects.hash(host, port, username); } @Override public String toString() { - return host + ':' + port; + return username + '@' + host + ':' + port; } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java index 732cde2e2..10c1ea812 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java @@ -46,7 +46,10 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory { final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); final ConnectionPoolId connectionPoolId = - new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort()); + new ConnectionPoolId( + sourceConfig.getHostname(), + sourceConfig.getPort(), + sourceConfig.getUsername()); HikariDataSource dataSource = JdbcConnectionPools.getInstance()