Merge branch '2.3.x' into dev

* 2.3.x:
  A race condition was observed under high load when a lot of connections expire at the same time, causing the pool to spike unnecessarily high.
pull/316/merge
Brett Wooldridge 10 years ago
commit 120b2deeba

@ -31,6 +31,8 @@ import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -347,23 +349,24 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
/** {@inheritDoc} */
@Override
public void addBagItem()
public Future<Boolean> addBagItem()
{
class AddConnection implements Runnable
{
FutureTask<Boolean> future = new FutureTask<Boolean>(new Runnable() {
public void run()
{
long sleepBackoff = 200L;
final int minimumIdle = configuration.getMinimumIdle();
final int maxPoolSize = configuration.getMaximumPoolSize();
while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && !addConnection()) {
while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && getIdleConnections() <= minimumIdle && !addConnection()) {
// If we got into the loop, addConnection() failed, so we sleep and retry
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5));
}
}
}
}, true);
addConnectionExecutor.execute(new AddConnection());
addConnectionExecutor.execute(future);
return future;
}
// ***********************************************************************

@ -24,6 +24,7 @@ import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
@ -57,9 +58,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
protected final AbstractQueuedLongSynchronizer synchronizer;
protected final CopyOnWriteArrayList<T> sharedList;
protected final AtomicLong sequence;
private final ThreadLocal<ArrayList<WeakReference<IConcurrentBagEntry>>> threadList;
private final AtomicLong sequence;
private final IBagStateListener listener;
private volatile boolean closed;
@ -105,6 +106,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
// Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit.toNanos(timeout);
Future<Boolean> addItemFuture = null;
final long startScan = System.nanoTime();
final long originTimeout = timeout;
do {
@ -117,8 +119,10 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
}
}
} while (startSeq < sequence.get());
listener.addBagItem();
if (addItemFuture == null || addItemFuture.isDone()) {
addItemFuture = listener.addBagItem();
}
if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) {
return null;

@ -15,6 +15,8 @@
*/
package com.zaxxer.hikari.util;
import java.util.concurrent.Future;
/**
* This interface is implemented by a listener to the ConcurrentBag. The
* listener will be informed of when the bag has become empty. The usual
@ -25,5 +27,5 @@ package com.zaxxer.hikari.util;
*/
public interface IBagStateListener
{
void addBagItem();
Future<Boolean> addBagItem();
}

Loading…
Cancel
Save