features: add heartbeat to keepalive connection if configure it (#1699)

* features: add heartbeat to keepalive connection if configure it

* features: add heartbeat to keepalive connection if configure it

* features: add heartbeat to keepalive connection if configure it

* features: keepalive unit test

* optimize code

* optimize code
pull/1716/head
nothing 4 years ago committed by GitHub
parent a5a38d582f
commit 0561309e32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -55,6 +55,7 @@ public class HikariConfig implements HikariConfigMXBean
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5); private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10); private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30); private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final long DEFAULT_KEEPALIVE_TIME = 0L;
private static final int DEFAULT_POOL_SIZE = 10; private static final int DEFAULT_POOL_SIZE = 10;
private static boolean unitTest = false; private static boolean unitTest = false;
@ -99,6 +100,8 @@ public class HikariConfig implements HikariConfigMXBean
private Object healthCheckRegistry; private Object healthCheckRegistry;
private Properties healthCheckProperties; private Properties healthCheckProperties;
private long keepaliveTime;
private volatile boolean sealed; private volatile boolean sealed;
/** /**
@ -117,6 +120,7 @@ public class HikariConfig implements HikariConfigMXBean
idleTimeout = IDLE_TIMEOUT; idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1; initializationFailTimeout = 1;
isAutoCommit = true; isAutoCommit = true;
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
String systemProp = System.getProperty("hikaricp.configurationFile"); String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) { if (systemProp != null) {
@ -720,6 +724,26 @@ public class HikariConfig implements HikariConfigMXBean
healthCheckProperties.setProperty(key, value); healthCheckProperties.setProperty(key, value);
} }
/**
* This property controls the keepalive interval for a connection in the pool. An in-use connection will never be
* tested by the keepalive thread, only when it is idle will it be tested.
*
* @return the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled).
*/
public long getKeepaliveTime() {
return keepaliveTime;
}
/**
* This property controls the keepalive interval for a connection in the pool. An in-use connection will never be
* tested by the keepalive thread, only when it is idle will it be tested.
*
* @param keepaliveTimeMs the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled).
*/
public void setKeepaliveTime(long keepaliveTimeMs) {
this.keepaliveTime = keepaliveTimeMs;
}
/** /**
* Determine whether the Connections in the pool are in read-only mode. * Determine whether the Connections in the pool are in read-only mode.
* *
@ -1018,6 +1042,18 @@ public class HikariConfig implements HikariConfigMXBean
maxLifetime = MAX_LIFETIME; maxLifetime = MAX_LIFETIME;
} }
// keepalive time must larger then 30 seconds
if (keepaliveTime != 0 && keepaliveTime < SECONDS.toMillis(30)) {
LOGGER.warn("{} - keepaliveTime is less than 30000ms, disabling it.", poolName);
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}
// keepalive time must be less than maxLifetime (if maxLifetime is enabled)
if (keepaliveTime != 0 && maxLifetime != 0 && keepaliveTime >= maxLifetime) {
LOGGER.warn("{} - keepaliveTime is greater than or equal to maxLifetime, disabling it.", poolName);
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}
if (leakDetectionThreshold > 0 && !unitTest) { if (leakDetectionThreshold > 0 && !unitTest) {
if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) { if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName); LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);

@ -477,6 +477,8 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag
final PoolEntry poolEntry = newPoolEntry(); final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime(); final long maxLifetime = config.getMaxLifetime();
final long keepaliveTime = config.getKeepaliveTime();
if (maxLifetime > 0) { if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime // variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
@ -490,6 +492,25 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag
lifetime, MILLISECONDS)); lifetime, MILLISECONDS));
} }
if (keepaliveTime > 0) {
// variance up to 10% of the heartbeat time
final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);
final long heartbeatTime = keepaliveTime - variance;
poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(
() -> {
if (connectionBag.reserve(poolEntry)) {
if (!isConnectionAlive(poolEntry.connection)) {
softEvictConnection(poolEntry, DEAD_CONNECTION_MESSAGE, true);
addBagItem(connectionBag.getWaitingThreadCount());
}
else {
connectionBag.unreserve(poolEntry);
logger.debug("{} - keepalive: connection {} is alive", poolName, poolEntry.connection);
}
}
}, heartbeatTime, heartbeatTime, MILLISECONDS));
}
return poolEntry; return poolEntry;
} }
catch (ConnectionSetupException e) { catch (ConnectionSetupException e) {

@ -47,6 +47,7 @@ final class PoolEntry implements IConcurrentBagEntry
private volatile boolean evict; private volatile boolean evict;
private volatile ScheduledFuture<?> endOfLife; private volatile ScheduledFuture<?> endOfLife;
private volatile ScheduledFuture<?> keepalive;
private final FastList<Statement> openStatements; private final FastList<Statement> openStatements;
private final HikariPool hikariPool; private final HikariPool hikariPool;
@ -92,6 +93,10 @@ final class PoolEntry implements IConcurrentBagEntry
this.endOfLife = endOfLife; this.endOfLife = endOfLife;
} }
public void setKeepalive(ScheduledFuture<?> keepalive) {
this.keepalive = keepalive;
}
Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) Connection createProxyConnection(final ProxyLeakTask leakTask, final long now)
{ {
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit); return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
@ -175,9 +180,15 @@ final class PoolEntry implements IConcurrentBagEntry
LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection); LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
} }
ScheduledFuture<?> ka = keepalive;
if (ka != null && !ka.isDone() && !ka.cancel(false)) {
LOGGER.warn("{} - keepalive task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}
Connection con = connection; Connection con = connection;
connection = null; connection = null;
endOfLife = null; endOfLife = null;
keepalive = null;
return con; return con;
} }

@ -33,7 +33,7 @@ import java.sql.Statement;
import java.sql.Struct; import java.sql.Struct;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executor; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.zaxxer.hikari.util.UtilityElf; import com.zaxxer.hikari.util.UtilityElf;
@ -47,11 +47,16 @@ public class StubConnection extends StubBaseConnection implements Connection
public static final AtomicInteger count = new AtomicInteger(); public static final AtomicInteger count = new AtomicInteger();
public static volatile boolean slowCreate; public static volatile boolean slowCreate;
public static volatile boolean oldDriver; public static volatile boolean oldDriver;
private volatile boolean isClosed = false;
private static long foo; private static long foo;
private boolean autoCommit; private boolean autoCommit;
private int isolation = Connection.TRANSACTION_READ_COMMITTED; private int isolation = Connection.TRANSACTION_READ_COMMITTED;
private String catalog; private String catalog;
private long waitTimeout;
private static ScheduledExecutorService connectionWaitTimeout = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture<?> waitTimeoutTask;
static { static {
foo = System.currentTimeMillis(); foo = System.currentTimeMillis();
@ -64,6 +69,21 @@ public class StubConnection extends StubBaseConnection implements Connection
} }
} }
public StubConnection(long waitTimeout) {
this.waitTimeout = waitTimeout;
count.incrementAndGet();
if (slowCreate) {
UtilityElf.quietlySleep(1000);
}
try {
refreshConnectionWaitTimeout();
} catch (Exception e){
//ignore
}
}
/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
@ -128,7 +148,21 @@ public class StubConnection extends StubBaseConnection implements Connection
@Override @Override
public void commit() throws SQLException public void commit() throws SQLException
{ {
refreshConnectionWaitTimeout();
}
private void refreshConnectionWaitTimeout() throws SQLException {
if (this.isClosed) {
throw new SQLException("connection has been closed");
}
if (waitTimeoutTask != null) {
waitTimeoutTask.cancel(true);
}
if (waitTimeout > 0) {
waitTimeoutTask = connectionWaitTimeout.schedule(() -> { this.isClosed = true;}, waitTimeout, TimeUnit.MILLISECONDS);
}
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -152,7 +186,7 @@ public class StubConnection extends StubBaseConnection implements Connection
if (throwException) { if (throwException) {
throw new SQLException(); throw new SQLException();
} }
return false; return isClosed;
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -408,7 +442,8 @@ public class StubConnection extends StubBaseConnection implements Connection
if (throwException) { if (throwException) {
throw new SQLException(); throw new SQLException();
} }
return true; refreshConnectionWaitTimeout();
return !isClosed;
} }
/** {@inheritDoc} */ /** {@inheritDoc} */

@ -38,6 +38,7 @@ public class StubDataSource implements DataSource
private SQLException throwException; private SQLException throwException;
private long connectionAcquistionTime = 0; private long connectionAcquistionTime = 0;
private int loginTimeout; private int loginTimeout;
private int waitTimeout = 30000;
public String getUser() public String getUser()
{ {
@ -59,6 +60,14 @@ public class StubDataSource implements DataSource
this.password = password; this.password = password;
} }
public int getWaitTimeout() {
return waitTimeout;
}
public void setWaitTimeout(int waitTimeout) {
this.waitTimeout = waitTimeout;
}
public void setURL(String url) public void setURL(String url)
{ {
// we don't care // we don't care
@ -127,15 +136,14 @@ public class StubDataSource implements DataSource
if (connectionAcquistionTime > 0) { if (connectionAcquistionTime > 0) {
UtilityElf.quietlySleep(connectionAcquistionTime); UtilityElf.quietlySleep(connectionAcquistionTime);
} }
return new StubConnection(waitTimeout);
return new StubConnection();
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public Connection getConnection(String username, String password) throws SQLException public Connection getConnection(String username, String password) throws SQLException
{ {
return new StubConnection(); return new StubConnection(waitTimeout);
} }
public void setThrowException(SQLException e) public void setThrowException(SQLException e)

@ -68,6 +68,7 @@ public class StubStatement implements Statement
{ {
checkClosed(); checkClosed();
StubResultSet resultSet = new StubResultSet(); StubResultSet resultSet = new StubResultSet();
connection.commit();
return resultSet; return resultSet;
} }
@ -179,6 +180,7 @@ public class StubStatement implements Statement
if (simulatedQueryTime > 0) { if (simulatedQueryTime > 0) {
quietlySleep(simulatedQueryTime); quietlySleep(simulatedQueryTime);
} }
connection.commit();
return false; return false;
} }

@ -217,6 +217,76 @@ public class TestConnections
} }
} }
@Test
public void testKeepalive() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(700);
config.setDataSource(sds);
System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");
setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepaliveTime(500);
HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertFalse("Connection should be open", unwrap.isClosed());
}
finally {
setConfigUnitTest(false);
}
}
@Test
public void testKeepalive2() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(500);
config.setDataSource(sds);
System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");
setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepaliveTime(700);
HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertTrue("Connection should have closed:" + unwrap, unwrap.isClosed());
Connection conn2 = pool.getConnection();
Connection unwrap2 = conn2.unwrap(Connection.class);
assertNotSame("Expected a different connection", unwrap, unwrap2);
assertFalse("Connection should be open", unwrap2.isClosed());
conn2.close();
}
finally {
setConfigUnitTest(false);
}
}
@Test @Test
public void testDoubleClose() throws Exception public void testDoubleClose() throws Exception
{ {

Loading…
Cancel
Save