Merge selected parts of pull request.

pull/316/merge
Brett Wooldridge 10 years ago
parent 0837a818bb
commit ccdeeb0746

@ -79,7 +79,7 @@ public class HikariDataSource extends HikariConfig implements DataSource, Closea
@Override @Override
public Connection getConnection() throws SQLException public Connection getConnection() throws SQLException
{ {
if (isShutdown.get()) { if (isClosed()) {
throw new SQLException("Pool " + pool.getConfiguration().getPoolName() + " has been shutdown"); throw new SQLException("Pool " + pool.getConfiguration().getPoolName() + " has been shutdown");
} }

@ -37,10 +37,11 @@ public final class CodaHaleMetricsTracker extends MetricsTracker
super(pool); super(pool);
this.registry = registry; this.registry = registry;
this.connectionObtainTimer = registry.timer(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "Wait")); final String poolName = pool.getConfiguration().getPoolName();
this.connectionUsage = registry.histogram(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "Usage")); this.connectionObtainTimer = registry.timer(MetricRegistry.name(poolName, "pool", "Wait"));
this.connectionUsage = registry.histogram(MetricRegistry.name(poolName, "pool", "Usage"));
registry.register(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "TotalConnections"), registry.register(MetricRegistry.name(poolName, "pool", "TotalConnections"),
new CachedGauge<Integer>(10, TimeUnit.SECONDS) { new CachedGauge<Integer>(10, TimeUnit.SECONDS) {
@Override @Override
protected Integer loadValue() protected Integer loadValue()
@ -49,7 +50,7 @@ public final class CodaHaleMetricsTracker extends MetricsTracker
} }
}); });
registry.register(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "IdleConnections"), registry.register(MetricRegistry.name(poolName, "pool", "IdleConnections"),
new CachedGauge<Integer>(10, TimeUnit.SECONDS) { new CachedGauge<Integer>(10, TimeUnit.SECONDS) {
@Override @Override
protected Integer loadValue() protected Integer loadValue()
@ -58,7 +59,7 @@ public final class CodaHaleMetricsTracker extends MetricsTracker
} }
}); });
registry.register(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "ActiveConnections"), registry.register(MetricRegistry.name(poolName, "pool", "ActiveConnections"),
new CachedGauge<Integer>(10, TimeUnit.SECONDS) { new CachedGauge<Integer>(10, TimeUnit.SECONDS) {
@Override @Override
protected Integer loadValue() protected Integer loadValue()
@ -67,7 +68,7 @@ public final class CodaHaleMetricsTracker extends MetricsTracker
} }
}); });
registry.register(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "PendingConnections"), registry.register(MetricRegistry.name(poolName, "pool", "PendingConnections"),
new CachedGauge<Integer>(10, TimeUnit.SECONDS) { new CachedGauge<Integer>(10, TimeUnit.SECONDS) {
@Override @Override
protected Integer loadValue() protected Integer loadValue()
@ -81,12 +82,13 @@ public final class CodaHaleMetricsTracker extends MetricsTracker
@Override @Override
public void close() public void close()
{ {
registry.remove(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "Wait")); final String poolName = pool.getConfiguration().getPoolName();
registry.remove(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "Usage")); registry.remove(MetricRegistry.name(poolName, "pool", "Wait"));
registry.remove(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "TotalConnections")); registry.remove(MetricRegistry.name(poolName, "pool", "Usage"));
registry.remove(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "IdleConnections")); registry.remove(MetricRegistry.name(poolName, "pool", "TotalConnections"));
registry.remove(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "ActiveConnections")); registry.remove(MetricRegistry.name(poolName, "pool", "IdleConnections"));
registry.remove(MetricRegistry.name(pool.getConfiguration().getPoolName(), "pool", "PendingConnections")); registry.remove(MetricRegistry.name(poolName, "pool", "ActiveConnections"));
registry.remove(MetricRegistry.name(poolName, "pool", "PendingConnections"));
} }
/** {@inheritDoc} */ /** {@inheritDoc} */

@ -24,7 +24,7 @@ import com.zaxxer.hikari.pool.PoolBagEntry;
* *
* @author Brett Wooldridge * @author Brett Wooldridge
*/ */
public class MetricsTracker public class MetricsTracker implements AutoCloseable
{ {
public static final MetricsContext NO_CONTEXT = new MetricsContext(); public static final MetricsContext NO_CONTEXT = new MetricsContext();
@ -44,6 +44,7 @@ public class MetricsTracker
{ {
} }
@Override
public void close() public void close()
{ {
} }

@ -251,16 +251,19 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
addConnectionExecutor.awaitTermination(5L, TimeUnit.SECONDS); addConnectionExecutor.awaitTermination(5L, TimeUnit.SECONDS);
final ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", final ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin",
configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
final long start = System.currentTimeMillis(); try {
do { final long start = System.currentTimeMillis();
softEvictConnections(); do {
abortActiveConnections(assassinExecutor); softEvictConnections();
abortActiveConnections(assassinExecutor);
}
while (getTotalConnections() > 0 && elapsedTimeMs(start) < TimeUnit.SECONDS.toNanos(5));
} finally {
assassinExecutor.shutdown();
assassinExecutor.awaitTermination(5L, TimeUnit.SECONDS);
} }
while (getTotalConnections() > 0 && elapsedTimeMs(start) < TimeUnit.SECONDS.toMillis(5));
assassinExecutor.shutdown();
assassinExecutor.awaitTermination(5L, TimeUnit.SECONDS);
closeConnectionExecutor.shutdown(); closeConnectionExecutor.shutdown();
closeConnectionExecutor.awaitTermination(5L, TimeUnit.SECONDS); closeConnectionExecutor.awaitTermination(5L, TimeUnit.SECONDS);
} }
@ -352,6 +355,7 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
public Future<Boolean> addBagItem() public Future<Boolean> addBagItem()
{ {
FutureTask<Boolean> future = new FutureTask<Boolean>(new Runnable() { FutureTask<Boolean> future = new FutureTask<Boolean>(new Runnable() {
@Override
public void run() public void run()
{ {
long sleepBackoff = 200L; long sleepBackoff = 200L;
@ -447,7 +451,7 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
/** /**
* Permanently close the real (underlying) connection (eat any exception). * Permanently close the real (underlying) connection (eat any exception).
* *
* @param connectionProxy the connection to actually close * @param bagEntry the connection to actually close
*/ */
void closeConnection(final PoolBagEntry bagEntry, final String closureReason) void closeConnection(final PoolBagEntry bagEntry, final String closureReason)
{ {
@ -461,6 +465,7 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
} }
closeConnectionExecutor.execute(new Runnable() { closeConnectionExecutor.execute(new Runnable() {
@Override
public void run() { public void run() {
poolUtils.quietlyCloseConnection(connection, closureReason); poolUtils.quietlyCloseConnection(connection, closureReason);
} }
@ -475,7 +480,7 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
/** /**
* Create and add a single connection to the pool. * Create and add a single connection to the pool.
*/ */
private final boolean addConnection() private boolean addConnection()
{ {
// Speculative increment of totalConnections with expectation of success // Speculative increment of totalConnections with expectation of success
if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize()) { if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize()) {
@ -526,6 +531,7 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
if (connectionsToAdd > 0 && LOGGER.isDebugEnabled()) { if (connectionsToAdd > 0 && LOGGER.isDebugEnabled()) {
addConnectionExecutor.execute(new Runnable() { addConnectionExecutor.execute(new Runnable() {
@Override
public void run() { public void run() {
logPoolState("After fill "); logPoolState("After fill ");
} }
@ -537,7 +543,6 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
* Check whether the connection is alive or not. * Check whether the connection is alive or not.
* *
* @param connection the connection to test * @param connection the connection to test
* @param timeoutMs the timeout before we consider the test a failure
* @return true if the connection is alive, false if it is not alive or we timed out * @return true if the connection is alive, false if it is not alive or we timed out
*/ */
private boolean isConnectionAlive(final Connection connection) private boolean isConnectionAlive(final Connection connection)
@ -572,8 +577,6 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
/** /**
* Attempt to abort() active connections, or close() them. * Attempt to abort() active connections, or close() them.
*
* @throws InterruptedException
*/ */
private void abortActiveConnections(final ExecutorService assassinExecutor) private void abortActiveConnections(final ExecutorService assassinExecutor)
{ {

@ -49,6 +49,7 @@ public final class PoolBagEntry implements IConcurrentBagEntry
final long maxLifetime = pool.configuration.getMaxLifetime() - variance; final long maxLifetime = pool.configuration.getMaxLifetime() - variance;
if (maxLifetime > 0) { if (maxLifetime > 0) {
endOfLife = pool.houseKeepingExecutorService.schedule(new Runnable() { endOfLife = pool.houseKeepingExecutorService.schedule(new Runnable() {
@Override
public void run() public void run()
{ {
// If we can reserve it, close it // If we can reserve it, close it

@ -112,8 +112,8 @@ public abstract class ConnectionProxy implements IHikariConnectionProxy
boolean isForceClose = sqlState.startsWith("08") | SQL_ERRORS.contains(sqlState); boolean isForceClose = sqlState.startsWith("08") | SQL_ERRORS.contains(sqlState);
if (isForceClose) { if (isForceClose) {
bagEntry.evicted = true; bagEntry.evicted = true;
LOGGER.warn("Connection {} ({}) marked as broken because of SQLSTATE({}), ErrorCode({}).", delegate.toString(), LOGGER.warn("Connection {} ({}) marked as broken because of SQLSTATE({}), ErrorCode({}).", delegate,
parentPool.toString(), sqlState, sqle.getErrorCode(), sqle); parentPool, sqlState, sqle.getErrorCode(), sqle);
} }
else { else {
SQLException nse = sqle.getNextException(); SQLException nse = sqle.getNextException();

@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
* *
* @param <T> the templated type to store in the bag * @param <T> the templated type to store in the bag
*/ */
public class ConcurrentBag<T extends IConcurrentBagEntry> public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable
{ {
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
@ -155,7 +155,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
synchronizer.releaseShared(sequence.incrementAndGet()); synchronizer.releaseShared(sequence.incrementAndGet());
} }
else { else {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry.toString()); LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
} }
} }
@ -187,13 +187,13 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
public boolean remove(final T bagEntry) public boolean remove(final T bagEntry)
{ {
if (!bagEntry.state().compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state().compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) { if (!bagEntry.state().compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state().compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry.toString()); LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false; return false;
} }
final boolean removed = sharedList.remove(bagEntry); final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) { if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry.toString()); LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
} }
return removed; return removed;
} }
@ -201,6 +201,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
/** /**
* Close the bag to further adds. * Close the bag to further adds.
*/ */
@Override
public void close() public void close()
{ {
closed = true; closed = true;
@ -258,7 +259,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
synchronizer.releaseShared(checkInSeq); synchronizer.releaseShared(checkInSeq);
} }
else { else {
LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry.toString()); LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
} }
} }

@ -209,7 +209,7 @@ public class TestConnections
Assert.assertTrue("Connection should have closed", connection.isClosed()); Assert.assertTrue("Connection should have closed", connection.isClosed());
Assert.assertFalse("Connection should have closed", connection.isValid(5)); Assert.assertFalse("Connection should have closed", connection.isValid(5));
Assert.assertTrue("Expected to contain ClosedConnection, but was " + connection.toString(), connection.toString().contains("ClosedConnection")); Assert.assertTrue("Expected to contain ClosedConnection, but was " + connection, connection.toString().contains("ClosedConnection"));
connection.close(); connection.close();
} }

Loading…
Cancel
Save