Fix connection timeout/retry handling and add tests.

pull/60/head
Brett Wooldridge 11 years ago
parent 5e1b7be484
commit 41d7750133

@ -140,11 +140,13 @@ public class HikariConfig implements HikariConfigMBean
} }
} }
@Deprecated
public int getAcquireIncrement() public int getAcquireIncrement()
{ {
return 0; return 0;
} }
@Deprecated
public void setAcquireIncrement(int acquireIncrement) public void setAcquireIncrement(int acquireIncrement)
{ {
LOGGER.warn("The acquireIncrement property has been retired, remove it from your pool configuration to avoid this warning."); LOGGER.warn("The acquireIncrement property has been retired, remove it from your pool configuration to avoid this warning.");
@ -166,11 +168,13 @@ public class HikariConfig implements HikariConfigMBean
this.acquireRetries = acquireRetries; this.acquireRetries = acquireRetries;
} }
@Deprecated
public long getAcquireRetryDelay() public long getAcquireRetryDelay()
{ {
return 0; return 0;
} }
@Deprecated
public void setAcquireRetryDelay(long acquireRetryDelayMs) public void setAcquireRetryDelay(long acquireRetryDelayMs)
{ {
LOGGER.warn("The acquireRetryDelay property has been retired, remove it from your pool configuration to avoid this warning."); LOGGER.warn("The acquireRetryDelay property has been retired, remove it from your pool configuration to avoid this warning.");

@ -143,10 +143,11 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
try try
{ {
long timeout = configuration.getConnectionTimeout(); long timeout = configuration.getConnectionTimeout();
final int retries = configuration.getAcquireRetries();
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
do do
{ {
IHikariConnectionProxy connectionProxy = idleConnectionBag.borrow(timeout, TimeUnit.MILLISECONDS); IHikariConnectionProxy connectionProxy = idleConnectionBag.borrow(retries, timeout);
if (connectionProxy == null) if (connectionProxy == null)
{ {
// We timed out... break and throw exception // We timed out... break and throw exception
@ -234,9 +235,10 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public void bagIsEmpty() public void addBagItem(long timeout)
{ {
addConnections(AddConnectionStrategy.ONLY_IF_EMPTY); // addConnections(AddConnectionStrategy.ONLY_IF_EMPTY);
addConnection(timeout);
} }
// *********************************************************************** // ***********************************************************************
@ -301,7 +303,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
while (maxIters-- > 0 && totalConnections.get() < configuration.getMinimumPoolSize()) while (maxIters-- > 0 && totalConnections.get() < configuration.getMinimumPoolSize())
{ {
int beforeCount = totalConnections.get(); int beforeCount = totalConnections.get();
addConnection(); addConnection(configuration.getConnectionTimeout());
if (configuration.isInitializationFailFast() && beforeCount == totalConnections.get()) if (configuration.isInitializationFailFast() && beforeCount == totalConnections.get())
{ {
throw new RuntimeException("Fail-fast during pool initialization"); throw new RuntimeException("Fail-fast during pool initialization");
@ -311,48 +313,21 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
logPoolState("Initial fill "); logPoolState("Initial fill ");
} }
/**
* Add connections to the pool, not exceeding the maximum allowed.
*/
private void addConnections(AddConnectionStrategy strategy)
{
switch (strategy)
{
case ONLY_IF_EMPTY:
addConnection();
break;
case MAINTAIN_MINIMUM:
final int min = configuration.getMinimumPoolSize();
for (int maxIterations = 0; maxIterations < min && totalConnections.get() < min; maxIterations++)
{
addConnection();
}
break;
}
}
/** /**
* Create and add a single connection to the pool. * Create and add a single connection to the pool.
*/ */
private void addConnection() private void addConnection(final long loginTimeout)
{
final int acquireTimeout = (int) configuration.getConnectionTimeout();
final int acquireRetries = configuration.getAcquireRetries();
final int loginTimeout = (acquireRetries > 0 && acquireTimeout > 0) ? Math.max((acquireTimeout / (acquireRetries + 1)), 50) : acquireTimeout;
for (int retries = 0; retries <= acquireRetries && !shutdown; retries++)
{ {
long startMs = System.currentTimeMillis();
try try
{ {
// Speculative increment of totalConnections with expectation of success (first time through) // Speculative increment of totalConnections with expectation of success (first time through)
if (retries == 0 && totalConnections.incrementAndGet() > configuration.getMaximumPoolSize()) if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize())
{ {
totalConnections.decrementAndGet(); totalConnections.decrementAndGet();
break; return;
} }
dataSource.setLoginTimeout(loginTimeout); dataSource.setLoginTimeout((int) loginTimeout);
Connection connection = dataSource.getConnection(); Connection connection = dataSource.getConnection();
transactionIsolation = (transactionIsolation < 0 ? connection.getTransactionIsolation() : transactionIsolation); transactionIsolation = (transactionIsolation < 0 ? connection.getTransactionIsolation() : transactionIsolation);
@ -371,18 +346,11 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
} }
catch (Exception e) catch (Exception e)
{ {
LOGGER.warn("Maximum connection creation retries exceeded: {}", e.getMessage(), (debug ? e : null));
long delay = loginTimeout - (System.currentTimeMillis() - startMs);
if (retries < acquireRetries && !sleepQuietly(delay))
{
break;
}
}
}
// We failed, so undo speculative increment of totalConnections // We failed, so undo speculative increment of totalConnections
totalConnections.decrementAndGet(); totalConnections.decrementAndGet();
LOGGER.warn("Maximum connection creation retries exceeded: {}", e.getMessage(), (debug ? e : null));
}
} }
/** /**
@ -478,22 +446,6 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
} }
} }
private boolean sleepQuietly(long delay)
{
try
{
if (delay > 0)
{
Thread.sleep(delay);
}
return true;
}
catch (InterruptedException e1)
{
return false;
}
}
private void logPoolState(String... prefix) private void logPoolState(String... prefix)
{ {
int total = totalConnections.get(); int total = totalConnections.get();
@ -537,15 +489,14 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
} }
} }
addConnections(AddConnectionStrategy.MAINTAIN_MINIMUM); // TRY to maintain minimum connections (best effort, no retries)
final int min = configuration.getMinimumPoolSize();
for (int maxIterations = 0; maxIterations < min && totalConnections.get() < min; maxIterations++)
{
addConnection(configuration.getConnectionTimeout());
}
logPoolState("After pool cleanup "); logPoolState("After pool cleanup ");
} }
} }
private static enum AddConnectionStrategy
{
ONLY_IF_EMPTY,
MAINTAIN_MINIMUM
}
} }

@ -72,7 +72,7 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
*/ */
public interface IBagStateListener public interface IBagStateListener
{ {
void bagIsEmpty(); void addBagItem(long timeout);
} }
private ThreadLocal<LinkedList<WeakReference<T>>> threadList; private ThreadLocal<LinkedList<WeakReference<T>>> threadList;
@ -99,7 +99,7 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
* @return a borrowed instance from the bag or null if a timeout occurs * @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting * @throws InterruptedException if interrupted while waiting
*/ */
public T borrow(long timeout, TimeUnit timeUnit) throws InterruptedException public T borrow(int retries, long timeoutMillis) throws InterruptedException
{ {
// Try the thread-local list first // Try the thread-local list first
LinkedList<WeakReference<T>> list = threadList.get(); LinkedList<WeakReference<T>> list = threadList.get();
@ -120,9 +120,12 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
} }
// Otherwise, scan the shared list ... for maximum of timeout // Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit.toNanos(timeout); final long retryTimeout = (retries > 0 && timeoutMillis > 0) ? Math.max((timeoutMillis / (retries + 1)), 50) : timeoutMillis;
do { long totalTimeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
final long startScan = System.nanoTime(); boolean tryAddItem = true;
while (totalTimeoutNs > 0)
{
final long startAttempt = System.nanoTime();
for (T reference : sharedList) for (T reference : sharedList)
{ {
if (reference.compareAndSetState(STATE_NOT_IN_USE, STATE_IN_USE)) if (reference.compareAndSetState(STATE_NOT_IN_USE, STATE_IN_USE))
@ -131,15 +134,34 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
} }
} }
if (listener != null) if (listener != null && tryAddItem)
{ {
listener.bagIsEmpty(); listener.addBagItem(retryTimeout);
} }
synchronizer.tryAcquireSharedNanos(startScan, timeout); totalTimeoutNs -= (System.nanoTime() - startAttempt);
final long startTryAcquire = System.nanoTime();
try
{
long timeoutNs = TimeUnit.MILLISECONDS.toNanos(retryTimeout) - (System.nanoTime() - startAttempt);
if (synchronizer.tryAcquireSharedNanos(startAttempt, timeoutNs))
{
tryAddItem = false;
continue;
}
}
finally
{
totalTimeoutNs -= (System.nanoTime() - startTryAcquire);
}
if (retries-- == 0)
{
break;
}
timeout -= (System.nanoTime() - startScan); tryAddItem = true;
} while (timeout > 0); }
return null; return null;
} }
@ -163,7 +185,14 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
if (value.compareAndSetState(STATE_IN_USE, STATE_NOT_IN_USE)) if (value.compareAndSetState(STATE_IN_USE, STATE_NOT_IN_USE))
{ {
final long returnTime = System.nanoTime(); final long returnTime = System.nanoTime();
threadList.get().addLast(new WeakReference<T>(value)); LinkedList<WeakReference<T>> list = threadList.get();
if (list == null)
{
list = new LinkedList<WeakReference<T>>();
threadList.set(list);
}
list.addLast(new WeakReference<T>(value));
synchronizer.releaseShared(returnTime); synchronizer.releaseShared(returnTime);
} }
else else

@ -1,85 +0,0 @@
package com.zaxxer.hikari;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import com.zaxxer.hikari.mocks.StubDataSource;
public class FailedRetryTest
{
@Test
public void testConnectionRetries() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2800);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
StubDataSource stubDataSource = ds.unwrap(StubDataSource.class);
stubDataSource.setThrowException(new SQLException("Connection refused"));
long start = System.currentTimeMillis();
try
{
Connection connection = ds.getConnection();
connection.close();
Assert.fail("Should not have been able to get a connection.");
}
catch (SQLException e)
{
long elapsed = System.currentTimeMillis() - start;
System.err.printf("Elapsed time for connection attempt %dms\n", elapsed);
Assert.assertTrue("Didn't wait long enough for timeout", (elapsed > config.getConnectionTimeout()));
}
}
@Test
public void testConnectionRetries2() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2800);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
final long timePerTry = config.getConnectionTimeout() / (config.getAcquireRetries() + 1);
final StubDataSource stubDataSource = ds.unwrap(StubDataSource.class);
stubDataSource.setThrowException(new SQLException("Connection refused"));
Executors.newScheduledThreadPool(1).schedule(new Runnable() {
public void run()
{
stubDataSource.setThrowException(null);
System.err.println("Turned off exception throwing.");
}
}, (timePerTry * 2) + 100, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
try
{
Connection connection = ds.getConnection();
System.err.println("Got a connection!");
// connection.close();
Assert.fail("Should not have been able to get a connection.");
}
catch (SQLException e)
{
long elapsed = System.currentTimeMillis() - start;
System.err.printf("Elapsed time for connection attempt %dms\n", elapsed);
Assert.assertTrue("Didn't wait long enough for timeout", (elapsed > timePerTry * 3) && (elapsed < timePerTry * 4));
}
}
}

@ -0,0 +1,250 @@
package com.zaxxer.hikari;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import com.zaxxer.hikari.mocks.StubDataSource;
public class TestConnectionTimeoutRetry
{
@Test
public void testConnectionRetries() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2800);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
StubDataSource stubDataSource = ds.unwrap(StubDataSource.class);
stubDataSource.setThrowException(new SQLException("Connection refused"));
long start = System.currentTimeMillis();
try
{
Connection connection = ds.getConnection();
connection.close();
Assert.fail("Should not have been able to get a connection.");
}
catch (SQLException e)
{
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Didn't wait long enough for timeout", (elapsed > config.getConnectionTimeout()));
}
finally
{
ds.shutdown();
}
}
@Test
public void testConnectionRetries2() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2800);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
final StubDataSource stubDataSource = ds.unwrap(StubDataSource.class);
stubDataSource.setThrowException(new SQLException("Connection refused"));
final long timePerTry = config.getConnectionTimeout() / (config.getAcquireRetries() + 1);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(new Runnable() {
public void run()
{
stubDataSource.setThrowException(null);
}
}, (timePerTry * 2) + 100, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
try
{
Connection connection = ds.getConnection();
connection.close();
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Waited too long to get a connection.", (elapsed >= timePerTry * 3) && (elapsed < config.getConnectionTimeout()));
}
catch (SQLException e)
{
Assert.fail("Should not have timed out.");
}
finally
{
scheduler.shutdownNow();
ds.shutdown();
}
}
@Test
public void testConnectionRetries3() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(2);
config.setConnectionTimeout(2800);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
final Connection connection1 = ds.getConnection();
final Connection connection2 = ds.getConnection();
Assert.assertNotNull(connection1);
Assert.assertNotNull(connection2);
final long timePerTry = config.getConnectionTimeout() / (config.getAcquireRetries() + 1);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(new Runnable() {
public void run()
{
try
{
connection1.close();
}
catch (Exception e)
{
e.printStackTrace(System.err);
}
}
}, timePerTry + 100, TimeUnit.MILLISECONDS);
scheduler.schedule(new Runnable() {
public void run()
{
try
{
connection2.close();
}
catch (Exception e)
{
e.printStackTrace(System.err);
}
}
}, (timePerTry * 2) + 100, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
try
{
Connection connection3 = ds.getConnection();
connection3.close();
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Waited too long to get a connection.", (elapsed >= timePerTry) && (elapsed < timePerTry * 2));
}
catch (SQLException e)
{
Assert.fail("Should not have timed out.");
}
finally
{
scheduler.shutdownNow();
ds.shutdown();
}
}
@Test
public void testConnectionRetries4() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(1);
config.setAcquireRetries(0);
config.setConnectionTimeout(1000);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
StubDataSource stubDataSource = ds.unwrap(StubDataSource.class);
stubDataSource.setThrowException(new SQLException("Connection refused"));
long start = System.currentTimeMillis();
try
{
Connection connection = ds.getConnection();
connection.close();
Assert.fail("Should not have been able to get a connection.");
}
catch (SQLException e)
{
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Didn't wait long enough for timeout", (elapsed > config.getConnectionTimeout()));
}
finally
{
ds.shutdown();
}
}
@Test
public void testConnectionRetries5() throws SQLException
{
HikariConfig config = new HikariConfig();
config.setMinimumPoolSize(0);
config.setMaximumPoolSize(2);
config.setAcquireRetries(0);
config.setConnectionTimeout(1000);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
HikariDataSource ds = new HikariDataSource(config);
final Connection connection1 = ds.getConnection();
long start = System.currentTimeMillis();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(new Runnable() {
public void run()
{
try
{
connection1.close();
}
catch (Exception e)
{
e.printStackTrace(System.err);
}
}
}, 250, TimeUnit.MILLISECONDS);
StubDataSource stubDataSource = ds.unwrap(StubDataSource.class);
stubDataSource.setThrowException(new SQLException("Connection refused"));
try
{
Connection connection2 = ds.getConnection();
connection2.close();
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Waited too long to get a connection.", (elapsed >= 250) && (elapsed < config.getConnectionTimeout()));
}
catch (SQLException e)
{
Assert.fail("Should not have timed out.");
}
finally
{
scheduler.shutdownNow();
ds.shutdown();
}
}
}

@ -32,7 +32,7 @@ import org.junit.Test;
* *
* @author Brett Wooldridge * @author Brett Wooldridge
*/ */
public class CreationTest public class TestConnections
{ {
@Test @Test
public void testCreate() throws SQLException public void testCreate() throws SQLException
Loading…
Cancel
Save