diff --git a/flink-connector-mysql-cdc/pom.xml b/flink-connector-mysql-cdc/pom.xml
index 62afdad75..dabc4e884 100644
--- a/flink-connector-mysql-cdc/pom.xml
+++ b/flink-connector-mysql-cdc/pom.xml
@@ -63,6 +63,12 @@ under the License.
+
+ com.zaxxer
+ HikariCP
+ 4.0.3
+
+
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java
new file mode 100644
index 000000000..82d540e34
--- /dev/null
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.mysql.source.connection;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The connection pool identifier. */
+public class ConnectionPoolId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final String host;
+ private final int port;
+
+ public ConnectionPoolId(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ConnectionPoolId)) {
+ return false;
+ }
+ ConnectionPoolId that = (ConnectionPoolId) o;
+ return Objects.equals(host, that.host) && Objects.equals(port, that.port);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port);
+ }
+
+ @Override
+ public String toString() {
+ return host + ':' + port;
+ }
+}
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java
new file mode 100644
index 000000000..57c327817
--- /dev/null
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.mysql.source.connection;
+
+import org.apache.flink.annotation.Internal;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+/** A JDBC connection pools that consists of {@link HikariDataSource}. */
+@Internal
+public interface ConnectionPools {
+
+ /**
+ * Gets a connection pool from pools, create a new pool if the pool does not exists in the
+ * connection pools .
+ */
+ HikariDataSource getOrCreateConnectionPool(
+ ConnectionPoolId poolId, MySqlSourceConfig sourceConfig);
+}
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java
new file mode 100644
index 000000000..732cde2e2
--- /dev/null
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.mysql.source.connection;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/** A factory to create JDBC connection for MySQL. */
+public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class);
+
+ private final MySqlSourceConfig sourceConfig;
+
+ public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) {
+ this.sourceConfig = sourceConfig;
+ }
+
+ @Override
+ public Connection connect(JdbcConfiguration config) throws SQLException {
+ final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
+
+ final ConnectionPoolId connectionPoolId =
+ new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort());
+
+ HikariDataSource dataSource =
+ JdbcConnectionPools.getInstance()
+ .getOrCreateConnectionPool(connectionPoolId, sourceConfig);
+
+ int i = 0;
+ while (i < connectRetryTimes) {
+ try {
+ return dataSource.getConnection();
+ } catch (SQLException e) {
+ if (i < connectRetryTimes - 1) {
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException ie) {
+ throw new FlinkRuntimeException(
+ "Failed to get connection, interrupted while doing another attempt",
+ ie);
+ }
+ LOG.warn("Get connection failed, retry times {}", i + 1);
+ } else {
+ LOG.error("Get connection failed after retry {} times", i + 1);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+ i++;
+ }
+ return dataSource.getConnection();
+ }
+}
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
new file mode 100644
index 000000000..d53dbf96a
--- /dev/null
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.mysql.source.connection;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource;
+
+/** A Jdbc Connection pools implementation. */
+public class JdbcConnectionPools implements ConnectionPools {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class);
+
+ private static final JdbcConnectionPools INSTANCE = new JdbcConnectionPools();
+ private final Map pools = new HashMap<>();
+
+ private JdbcConnectionPools() {}
+
+ public static JdbcConnectionPools getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public HikariDataSource getOrCreateConnectionPool(
+ ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) {
+ synchronized (pools) {
+ if (!pools.containsKey(poolId)) {
+ LOG.info("Create and register connection pool {}", poolId);
+ pools.put(poolId, createPooledDataSource(sourceConfig));
+ }
+ return pools.get(poolId);
+ }
+ }
+}
diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java
new file mode 100644
index 000000000..1602b4afa
--- /dev/null
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.mysql.source.connection;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
+public class PooledDataSourceFactory {
+
+ public static final String JDBC_URL_PATTERN =
+ "jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
+ public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
+
+ private PooledDataSourceFactory() {}
+
+ public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) {
+ final HikariConfig config = new HikariConfig();
+
+ String hostName = sourceConfig.getHostname();
+ int port = sourceConfig.getPort();
+
+ config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
+ config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port));
+ config.setUsername(sourceConfig.getUsername());
+ config.setPassword(sourceConfig.getPassword());
+ config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
+ config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+
+ // optional optimization configurations for pooled DataSource
+ config.addDataSourceProperty("cachePrepStmts", "true");
+ config.addDataSourceProperty("prepStmtCacheSize", "250");
+ config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
+
+ return new HikariDataSource(config);
+ }
+}
diff --git a/flink-sql-connector-mysql-cdc/pom.xml b/flink-sql-connector-mysql-cdc/pom.xml
index 8ddb81e49..7581ebdc5 100644
--- a/flink-sql-connector-mysql-cdc/pom.xml
+++ b/flink-sql-connector-mysql-cdc/pom.xml
@@ -66,6 +66,7 @@ under the License.
com.fasterxml.*:*
com.google.guava:*
com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
@@ -111,6 +112,12 @@ under the License.
com.esri.geometry
com.ververica.cdc.connectors.shaded.com.esri.geometry
+
+ com.zaxxer
+
+ com.ververica.cdc.connectors.shaded.com.zaxxer
+
+