ConcurrentBag optimizations.

pull/41/head
Brett Wooldridge 11 years ago
parent 23fcf89be4
commit d80d793457

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
@ -33,8 +32,9 @@ import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.proxy.IHikariConnectionProxy;
import com.zaxxer.hikari.proxy.ProxyFactory;
import com.zaxxer.hikari.util.PropertyBeanSetter;
import com.zaxxer.hikari.util.ConcurrentBag;
import com.zaxxer.hikari.util.ConcurrentBag.IBagStateListener;
import com.zaxxer.hikari.util.PropertyBeanSetter;
/**
* This is the primary connection pool class that provides the basic
@ -42,7 +42,7 @@ import com.zaxxer.hikari.util.ConcurrentBag;
*
* @author Brett Wooldridge
*/
public final class HikariPool implements HikariPoolMBean
public final class HikariPool implements HikariPoolMBean, IBagStateListener
{
private static final Logger LOGGER = LoggerFactory.getLogger(HikariPool.class);
@ -55,8 +55,6 @@ public final class HikariPool implements HikariPoolMBean
private final Timer houseKeepingTimer;
private final long leakDetectionThreshold;
private final AtomicBoolean backgroundFillQueued;
private final AtomicInteger idleConnectionCount;
private final AtomicInteger totalConnections;
private final boolean isAutoCommit;
private final boolean jdbc4ConnectionTest;
@ -77,9 +75,8 @@ public final class HikariPool implements HikariPoolMBean
this.configuration = configuration;
this.totalConnections = new AtomicInteger();
this.idleConnectionCount = new AtomicInteger();
this.backgroundFillQueued = new AtomicBoolean();
this.idleConnectionBag = new ConcurrentBag<IHikariConnectionProxy>();
this.idleConnectionBag.addBagStateListener(this);
this.jdbc4ConnectionTest = configuration.isJdbc4ConnectionTest();
this.leakDetectionThreshold = configuration.getLeakDetectionThreshold();
@ -155,13 +152,6 @@ public final class HikariPool implements HikariPoolMBean
throw new SQLException("Pool has been shutdown");
}
// Speculatively decrement idle count
final int idleCount = idleConnectionCount.getAndDecrement();
if (idleCount <= 0)
{
addConnections(AddConnectionStrategy.ONLY_IF_EMPTY);
}
try
{
long timeout = configuration.getConnectionTimeout();
@ -194,9 +184,6 @@ public final class HikariPool implements HikariPoolMBean
} while (timeout > 0);
// Undo speculative decrement of idle count
idleConnectionCount.incrementAndGet();
logPoolState();
String msg = String.format("Timeout of %dms encountered waiting for connection.", configuration.getConnectionTimeout());
@ -209,13 +196,6 @@ public final class HikariPool implements HikariPoolMBean
{
return null;
}
finally
{
if (idleCount <= 1 && backgroundFillQueued.compareAndSet(false, true))
{
addConnections(AddConnectionStrategy.BACKGROUND_FILL);
}
}
}
/**
@ -228,7 +208,6 @@ public final class HikariPool implements HikariPoolMBean
{
if (!connectionProxy.isBrokenConnection() && !shutdown)
{
idleConnectionCount.incrementAndGet();
idleConnectionBag.requite(connectionProxy);
}
else
@ -259,6 +238,16 @@ public final class HikariPool implements HikariPoolMBean
}
}
// ***********************************************************************
// IBagStateListener methods
// ***********************************************************************
@Override
public void bagIsEmpty()
{
addConnections(AddConnectionStrategy.ONLY_IF_EMPTY);
}
// ***********************************************************************
// HikariPoolMBean methods
// ***********************************************************************
@ -266,13 +255,13 @@ public final class HikariPool implements HikariPoolMBean
/** {@inheritDoc} */
public int getActiveConnections()
{
return Math.min(configuration.getMaximumPoolSize(), totalConnections.get() - idleConnectionCount.get());
return Math.min(configuration.getMaximumPoolSize(), totalConnections.get() - getIdleConnections());
}
/** {@inheritDoc} */
public int getIdleConnections()
{
return idleConnectionCount.get();
return idleConnectionBag.values(ConcurrentBag.STATE_NOT_IN_USE).size();
}
/** {@inheritDoc} */
@ -284,8 +273,7 @@ public final class HikariPool implements HikariPoolMBean
/** {@inheritDoc} */
public int getThreadsAwaitingConnection()
{
int idleCount = idleConnectionCount.get();
return (idleCount < 0 ? -idleCount : 0);
return idleConnectionBag.getPendingQueue();
}
/** {@inheritDoc} */
@ -299,8 +287,6 @@ public final class HikariPool implements HikariPoolMBean
continue;
}
idleConnectionCount.decrementAndGet();
closeConnection(connectionProxy);
}
}
@ -340,7 +326,7 @@ public final class HikariPool implements HikariPoolMBean
{
final int max = configuration.getMaximumPoolSize();
final int increment = configuration.getAcquireIncrement();
for (int i = 0; idleConnectionCount.get() < increment && i < increment && totalConnections.get() < max; i++)
for (int i = 0; i < increment && totalConnections.get() < max; i++)
{
addConnection();
}
@ -355,20 +341,6 @@ public final class HikariPool implements HikariPoolMBean
addConnection();
}
break;
case BACKGROUND_FILL:
houseKeepingTimer.schedule(new TimerTask() {
public void run()
{
final int max = configuration.getMaximumPoolSize();
int increment = configuration.getAcquireIncrement();
while (increment-- > 0 && getThreadsAwaitingConnection() > 0 && totalConnections.get() < max)
{
addConnection();
}
backgroundFillQueued.set(false);
}
}, 100/*ms*/);
break;
}
}
@ -414,7 +386,6 @@ public final class HikariPool implements HikariPoolMBean
if (!shutdown)
{
proxyConnection.resetConnectionState();
idleConnectionCount.incrementAndGet();
totalConnections.incrementAndGet();
idleConnectionBag.add(proxyConnection);
}
@ -523,7 +494,7 @@ public final class HikariPool implements HikariPoolMBean
private void logPoolState(String... prefix)
{
int total = totalConnections.get();
int idle = idleConnectionCount.get();
int idle = getIdleConnections();
LOGGER.debug("{}Pool stats (total={}, inUse={}, avail={}, waiting={})",
(prefix.length > 0 ? prefix[0] : ""), total, total - idle, idle, (isRegisteredMbeans ? getThreadsAwaitingConnection() : "n/a"));
}
@ -551,8 +522,6 @@ public final class HikariPool implements HikariPoolMBean
continue;
}
idleConnectionCount.decrementAndGet();
if ((idleTimeout > 0 && now > connectionProxy.getLastAccess() + idleTimeout)
||
(maxLifetime > 0 && now > connectionProxy.getCreationTime() + maxLifetime))
@ -561,7 +530,6 @@ public final class HikariPool implements HikariPoolMBean
}
else
{
idleConnectionCount.incrementAndGet();
idleConnectionBag.unreserve(connectionProxy);
}
}
@ -575,7 +543,6 @@ public final class HikariPool implements HikariPoolMBean
private static enum AddConnectionStrategy
{
ONLY_IF_EMPTY,
BACKGROUND_FILL,
MAINTAIN_MINIMUM
}
}

@ -64,9 +64,15 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
boolean compareAndSetState(int expectedState, int newState);
}
public interface IBagStateListener
{
void bagIsEmpty();
}
private ThreadLocal<LinkedList<WeakReference<T>>> threadList;
private CopyOnWriteArraySet<T> sharedList;
private Synchronizer synchronizer;
private IBagStateListener listener;
/**
* Constructor.
@ -119,6 +125,11 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
}
}
if (listener != null)
{
listener.bagIsEmpty();
}
synchronizer.tryAcquireSharedNanos(startScan, timeout);
timeout -= (System.nanoTime() - startScan);
@ -162,8 +173,9 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
*/
public void add(T value)
{
final long addTime = System.nanoTime();
sharedList.add(value);
synchronizer.releaseShared(1);
synchronizer.releaseShared(addTime);
}
/**
@ -225,6 +237,16 @@ public class ConcurrentBag<T extends com.zaxxer.hikari.util.ConcurrentBag.IBagMa
synchronizer.releaseShared(checkInTime);
}
public void addBagStateListener(IBagStateListener listener)
{
this.listener = listener;
}
public int getPendingQueue()
{
return synchronizer.getQueueLength();
}
/**
* Our private synchronizer that handles notify/wait type semantics.
*/

Loading…
Cancel
Save