A race condition was observed under high load when a lot of connections expire at the same time, causing the pool to spike unnecessarily high.

pull/323/head
Brett Wooldridge 10 years ago
parent 04a2234fec
commit 43910e61df

@ -25,12 +25,15 @@ import static com.zaxxer.hikari.util.UtilityElf.createInstance;
import static com.zaxxer.hikari.util.UtilityElf.createThreadPoolExecutor;
import static com.zaxxer.hikari.util.UtilityElf.elapsedTimeMs;
import static com.zaxxer.hikari.util.UtilityElf.getTransactionIsolation;
import static com.zaxxer.hikari.util.UtilityElf.quietlySleep;
import static com.zaxxer.hikari.util.UtilityElf.setRemoveOnCancelPolicy;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -393,6 +396,32 @@ public abstract class BaseHikariPool implements HikariPoolMBean, IBagStateListen
}
}
// ***********************************************************************
// IBagStateListener callback
// ***********************************************************************
/** {@inheritDoc} */
@Override
public Future<Boolean> addBagItem()
{
FutureTask<Boolean> future = new FutureTask<Boolean>(new Runnable() {
public void run()
{
long sleepBackoff = 200L;
final int minimumIdle = configuration.getMinimumIdle();
final int maxPoolSize = configuration.getMaximumPoolSize();
while (poolState == POOL_RUNNING && totalConnections.get() < maxPoolSize && getIdleConnections() <= minimumIdle && !addConnection()) {
// If we got into the loop, addConnection() failed, so we sleep and retry
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5));
}
}
}, true);
addConnectionExecutor.execute(future);
return future;
}
// ***********************************************************************
// Protected methods
// ***********************************************************************
@ -535,6 +564,7 @@ public abstract class BaseHikariPool implements HikariPoolMBean, IBagStateListen
*
* @return an IConnectionCustomizer instance
*/
@SuppressWarnings("deprecation")
private IConnectionCustomizer initializeCustomizer()
{
if (configuration.getConnectionCustomizerClassName() != null) {

@ -24,6 +24,7 @@ import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
@ -57,9 +58,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
protected final AbstractQueuedLongSynchronizer synchronizer;
protected final CopyOnWriteArrayList<T> sharedList;
protected final AtomicLong sequence;
private final ThreadLocal<ArrayList<WeakReference<IConcurrentBagEntry>>> threadList;
private final AtomicLong sequence;
private final IBagStateListener listener;
private volatile boolean closed;
@ -110,6 +111,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
// Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit.toNanos(timeout);
Future<Boolean> addItemFuture = null;
final long startScan = System.nanoTime();
final long originTimeout = timeout;
do {
@ -122,8 +124,10 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
}
}
} while (startSeq < sequence.get());
listener.addBagItem();
if (addItemFuture == null || addItemFuture.isDone()) {
addItemFuture = listener.addBagItem();
}
if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) {
return null;

@ -15,6 +15,8 @@
*/
package com.zaxxer.hikari.util;
import java.util.concurrent.Future;
/**
* This interface is implemented by a listener to the ConcurrentBag. The
* listener will be informed of when the bag has become empty. The usual
@ -25,5 +27,5 @@ package com.zaxxer.hikari.util;
*/
public interface IBagStateListener
{
void addBagItem();
Future<Boolean> addBagItem();
}

@ -18,7 +18,6 @@ package com.zaxxer.hikari.pool;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_IN_USE;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_NOT_IN_USE;
import static com.zaxxer.hikari.util.UtilityElf.quietlySleep;
import java.sql.Connection;
import java.sql.SQLException;
@ -61,31 +60,6 @@ public final class HikariPool extends BaseHikariPool
super(configuration, username, password);
}
// ***********************************************************************
// IBagStateListener callback
// ***********************************************************************
/** {@inheritDoc} */
@Override
public void addBagItem()
{
class AddConnection implements Runnable
{
public void run()
{
long sleepBackoff = 200L;
final int maxPoolSize = configuration.getMaximumPoolSize();
while (poolState == POOL_RUNNING && totalConnections.get() < maxPoolSize && !addConnection()) {
// If we got into the loop, addConnection() failed, so we sleep and retry
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5));
}
}
}
addConnectionExecutor.execute(new AddConnection());
}
// ***********************************************************************
// HikariPoolMBean methods
// ***********************************************************************

@ -101,21 +101,21 @@ public final class Java6ConcurrentBag extends ConcurrentBag<PoolBagEntry>
/**
* Our private synchronizer that handles notify/wait type semantics.
*/
private static final class Synchronizer extends AbstractQueuedLongSynchronizer
private final class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L;
@Override
protected long tryAcquireShared(long seq)
protected long tryAcquireShared(final long seq)
{
return getState() > seq && !java67hasQueuedPredecessors() ? 1L : -1L;
return java67hasQueuedPredecessors() ? -1L : getState() - (seq + 1);
}
/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared(long updateSeq)
protected boolean tryReleaseShared(final long ignored)
{
setState(updateSeq);
setState(sequence.get());
return true;
}

@ -18,7 +18,6 @@ package com.zaxxer.hikari.pool;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_IN_USE;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_NOT_IN_USE;
import static com.zaxxer.hikari.util.UtilityElf.quietlySleep;
import java.sql.Connection;
import java.sql.SQLException;
@ -61,25 +60,6 @@ public final class HikariPool extends BaseHikariPool
super(configuration, username, password);
}
// ***********************************************************************
// IBagStateListener callback
// ***********************************************************************
/** {@inheritDoc} */
@Override
public void addBagItem()
{
addConnectionExecutor.execute( () -> {
long sleepBackoff = 200L;
final int maxPoolSize = configuration.getMaximumPoolSize();
while (poolState == POOL_RUNNING && totalConnections.get() < maxPoolSize && !addConnection()) {
// If we got into the loop, addConnection() failed, so we sleep and retry
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5));
}
});
}
// ***********************************************************************
// HikariPoolMBean methods
// ***********************************************************************

@ -97,21 +97,21 @@ public final class Java8ConcurrentBag extends ConcurrentBag<PoolBagEntry>
/**
* Our private synchronizer that handles notify/wait type semantics.
*/
private static final class Synchronizer extends AbstractQueuedLongSynchronizer
private final class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L;
@Override
protected long tryAcquireShared(long seq)
protected long tryAcquireShared(final long seq)
{
return getState() > seq && !hasQueuedPredecessors() ? 1L : -1L;
return hasQueuedPredecessors() ? -1L : getState() - (seq + 1);
}
/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared(long updateSeq)
protected boolean tryReleaseShared(final long ignored)
{
setState(updateSeq);
setState(sequence.get());
return true;
}

Loading…
Cancel
Save