From 930efd9ce44416b5b32a1fa34a86fb2036902d3c Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Sat, 30 Oct 2021 01:19:01 +0800 Subject: [PATCH] [mysql] Introduce Jdbc connection pools --- flink-connector-mysql-cdc/pom.xml | 6 ++ .../source/connection/ConnectionPoolId.java | 57 ++++++++++++++ .../source/connection/ConnectionPools.java | 36 +++++++++ .../connection/JdbcConnectionFactory.java | 78 +++++++++++++++++++ .../connection/JdbcConnectionPools.java | 56 +++++++++++++ .../connection/PooledDataSourceFactory.java | 54 +++++++++++++ flink-sql-connector-mysql-cdc/pom.xml | 7 ++ 7 files changed, 294 insertions(+) create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java diff --git a/flink-connector-mysql-cdc/pom.xml b/flink-connector-mysql-cdc/pom.xml index 62afdad75..dabc4e884 100644 --- a/flink-connector-mysql-cdc/pom.xml +++ b/flink-connector-mysql-cdc/pom.xml @@ -63,6 +63,12 @@ under the License. + + com.zaxxer + HikariCP + 4.0.3 + + diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java new file mode 100644 index 000000000..82d540e34 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPoolId.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.connection; + +import java.io.Serializable; +import java.util.Objects; + +/** The connection pool identifier. */ +public class ConnectionPoolId implements Serializable { + + private static final long serialVersionUID = 1L; + private final String host; + private final int port; + + public ConnectionPoolId(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ConnectionPoolId)) { + return false; + } + ConnectionPoolId that = (ConnectionPoolId) o; + return Objects.equals(host, that.host) && Objects.equals(port, that.port); + } + + @Override + public int hashCode() { + return Objects.hash(host, port); + } + + @Override + public String toString() { + return host + ':' + port; + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java new file mode 100644 index 000000000..57c327817 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/ConnectionPools.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.connection; + +import org.apache.flink.annotation.Internal; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.zaxxer.hikari.HikariDataSource; + +/** A JDBC connection pools that consists of {@link HikariDataSource}. */ +@Internal +public interface ConnectionPools { + + /** + * Gets a connection pool from pools, create a new pool if the pool does not exists in the + * connection pools . + */ + HikariDataSource getOrCreateConnectionPool( + ConnectionPoolId poolId, MySqlSourceConfig sourceConfig); +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java new file mode 100644 index 000000000..732cde2e2 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.connection; + +import org.apache.flink.util.FlinkRuntimeException; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; + +/** A factory to create JDBC connection for MySQL. */ +public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class); + + private final MySqlSourceConfig sourceConfig; + + public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) { + this.sourceConfig = sourceConfig; + } + + @Override + public Connection connect(JdbcConfiguration config) throws SQLException { + final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); + + final ConnectionPoolId connectionPoolId = + new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort()); + + HikariDataSource dataSource = + JdbcConnectionPools.getInstance() + .getOrCreateConnectionPool(connectionPoolId, sourceConfig); + + int i = 0; + while (i < connectRetryTimes) { + try { + return dataSource.getConnection(); + } catch (SQLException e) { + if (i < connectRetryTimes - 1) { + try { + Thread.sleep(300); + } catch (InterruptedException ie) { + throw new FlinkRuntimeException( + "Failed to get connection, interrupted while doing another attempt", + ie); + } + LOG.warn("Get connection failed, retry times {}", i + 1); + } else { + LOG.error("Get connection failed after retry {} times", i + 1); + throw new FlinkRuntimeException(e); + } + } + i++; + } + return dataSource.getConnection(); + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java new file mode 100644 index 000000000..d53dbf96a --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.connection; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource; + +/** A Jdbc Connection pools implementation. */ +public class JdbcConnectionPools implements ConnectionPools { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class); + + private static final JdbcConnectionPools INSTANCE = new JdbcConnectionPools(); + private final Map pools = new HashMap<>(); + + private JdbcConnectionPools() {} + + public static JdbcConnectionPools getInstance() { + return INSTANCE; + } + + @Override + public HikariDataSource getOrCreateConnectionPool( + ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) { + synchronized (pools) { + if (!pools.containsKey(poolId)) { + LOG.info("Create and register connection pool {}", poolId); + pools.put(poolId, createPooledDataSource(sourceConfig)); + } + return pools.get(poolId); + } + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java new file mode 100644 index 000000000..1602b4afa --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.connection; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */ +public class PooledDataSourceFactory { + + public static final String JDBC_URL_PATTERN = + "jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL"; + public static final String CONNECTION_POOL_PREFIX = "connection-pool-"; + + private PooledDataSourceFactory() {} + + public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) { + final HikariConfig config = new HikariConfig(); + + String hostName = sourceConfig.getHostname(); + int port = sourceConfig.getPort(); + + config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port); + config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port)); + config.setUsername(sourceConfig.getUsername()); + config.setPassword(sourceConfig.getPassword()); + config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize()); + config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis()); + + // optional optimization configurations for pooled DataSource + config.addDataSourceProperty("cachePrepStmts", "true"); + config.addDataSourceProperty("prepStmtCacheSize", "250"); + config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); + + return new HikariDataSource(config); + } +} diff --git a/flink-sql-connector-mysql-cdc/pom.xml b/flink-sql-connector-mysql-cdc/pom.xml index 8ddb81e49..7581ebdc5 100644 --- a/flink-sql-connector-mysql-cdc/pom.xml +++ b/flink-sql-connector-mysql-cdc/pom.xml @@ -66,6 +66,7 @@ under the License. com.fasterxml.*:* com.google.guava:* com.esri.geometry:esri-geometry-api + com.zaxxer:HikariCP @@ -111,6 +112,12 @@ under the License. com.esri.geometry com.ververica.cdc.connectors.shaded.com.esri.geometry + + com.zaxxer + + com.ververica.cdc.connectors.shaded.com.zaxxer + +