[mysql] Introduce Jdbc connection pools

pull/543/head
Leonard Xu 3 years ago committed by Leonard Xu
parent eeae69cdf9
commit 930efd9ce4

@ -63,6 +63,12 @@ under the License.
</exclusions>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>4.0.3</version>
</dependency>
<!-- test dependencies on Debezium -->
<dependency>

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.connection;
import java.io.Serializable;
import java.util.Objects;
/** The connection pool identifier. */
public class ConnectionPoolId implements Serializable {
private static final long serialVersionUID = 1L;
private final String host;
private final int port;
public ConnectionPoolId(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ConnectionPoolId)) {
return false;
}
ConnectionPoolId that = (ConnectionPoolId) o;
return Objects.equals(host, that.host) && Objects.equals(port, that.port);
}
@Override
public int hashCode() {
return Objects.hash(host, port);
}
@Override
public String toString() {
return host + ':' + port;
}
}

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.connection;
import org.apache.flink.annotation.Internal;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.zaxxer.hikari.HikariDataSource;
/** A JDBC connection pools that consists of {@link HikariDataSource}. */
@Internal
public interface ConnectionPools {
/**
* Gets a connection pool from pools, create a new pool if the pool does not exists in the
* connection pools .
*/
HikariDataSource getOrCreateConnectionPool(
ConnectionPoolId poolId, MySqlSourceConfig sourceConfig);
}

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.connection;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
/** A factory to create JDBC connection for MySQL. */
public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class);
private final MySqlSourceConfig sourceConfig;
public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
}
@Override
public Connection connect(JdbcConfiguration config) throws SQLException {
final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
final ConnectionPoolId connectionPoolId =
new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort());
HikariDataSource dataSource =
JdbcConnectionPools.getInstance()
.getOrCreateConnectionPool(connectionPoolId, sourceConfig);
int i = 0;
while (i < connectRetryTimes) {
try {
return dataSource.getConnection();
} catch (SQLException e) {
if (i < connectRetryTimes - 1) {
try {
Thread.sleep(300);
} catch (InterruptedException ie) {
throw new FlinkRuntimeException(
"Failed to get connection, interrupted while doing another attempt",
ie);
}
LOG.warn("Get connection failed, retry times {}", i + 1);
} else {
LOG.error("Get connection failed after retry {} times", i + 1);
throw new FlinkRuntimeException(e);
}
}
i++;
}
return dataSource.getConnection();
}
}

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.connection;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource;
/** A Jdbc Connection pools implementation. */
public class JdbcConnectionPools implements ConnectionPools {
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class);
private static final JdbcConnectionPools INSTANCE = new JdbcConnectionPools();
private final Map<ConnectionPoolId, HikariDataSource> pools = new HashMap<>();
private JdbcConnectionPools() {}
public static JdbcConnectionPools getInstance() {
return INSTANCE;
}
@Override
public HikariDataSource getOrCreateConnectionPool(
ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) {
synchronized (pools) {
if (!pools.containsKey(poolId)) {
LOG.info("Create and register connection pool {}", poolId);
pools.put(poolId, createPooledDataSource(sourceConfig));
}
return pools.get(poolId);
}
}
}

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.connection;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public class PooledDataSourceFactory {
public static final String JDBC_URL_PATTERN =
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
private PooledDataSourceFactory() {}
public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) {
final HikariConfig config = new HikariConfig();
String hostName = sourceConfig.getHostname();
int port = sourceConfig.getPort();
config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port));
config.setUsername(sourceConfig.getUsername());
config.setPassword(sourceConfig.getPassword());
config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
// optional optimization configurations for pooled DataSource
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
return new HikariDataSource(config);
}
}

@ -66,6 +66,7 @@ under the License.
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.zaxxer:HikariCP</include>
</includes>
</artifactSet>
<filters>
@ -111,6 +112,12 @@ under the License.
<pattern>com.esri.geometry</pattern>
<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
</relocation>
<relocation>
<pattern>com.zaxxer</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.zaxxer
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>

Loading…
Cancel
Save