Fix #198 handle pull shutdown synchronisation issues

pull/201/head
Brett Wooldridge 11 years ago
parent da4ffe6d18
commit c78fc35c27

@ -313,7 +313,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
long sleepBackoff = 200L; long sleepBackoff = 200L;
final int maxPoolSize = configuration.getMaximumPoolSize(); final int maxPoolSize = configuration.getMaximumPoolSize();
final int minIdle = configuration.getMinimumIdle(); final int minIdle = configuration.getMinimumIdle();
while (!isShutdown && totalConnections.get() < maxPoolSize && (minIdle == 0 || getIdleConnections() < minIdle)) { while (!isPoolSuspended && !isShutdown && totalConnections.get() < maxPoolSize && (minIdle == 0 || getIdleConnections() < minIdle)) {
if (addConnection()) { if (addConnection()) {
if (minIdle == 0) { if (minIdle == 0) {
break; // This break is here so we only add one connection when there is no min. idle break; // This break is here so we only add one connection when there is no min. idle
@ -398,6 +398,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
if (isPoolSuspended) { if (isPoolSuspended) {
acquisitionSemaphore.release(10000); acquisitionSemaphore.release(10000);
isPoolSuspended = false; isPoolSuspended = false;
addBagItem(); // re-populate the pool
} }
} }
@ -412,16 +413,17 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
*/ */
private void closeConnection(final PoolBagEntry bagEntry) private void closeConnection(final PoolBagEntry bagEntry)
{ {
connectionBag.remove(bagEntry); if (connectionBag.remove(bagEntry)) {
final int tc = totalConnections.decrementAndGet(); final int tc = totalConnections.decrementAndGet();
if (tc < 0) { if (tc < 0) {
LOGGER.warn("Internal accounting inconsistency, totalConnections={}", tc, new Exception()); LOGGER.warn("Internal accounting inconsistency, totalConnections={}", tc, new Exception());
}
closeConnectionExecutor.submit(new Runnable() {
public void run() {
quietlyCloseConnection(bagEntry.connection);
} }
}); closeConnectionExecutor.submit(new Runnable() {
public void run() {
quietlyCloseConnection(bagEntry.connection);
}
});
}
} }
/** /**
@ -432,7 +434,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
// Speculative increment of totalConnections with expectation of success // Speculative increment of totalConnections with expectation of success
if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize() || isShutdown || isPoolSuspended) { if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize() || isShutdown || isPoolSuspended) {
totalConnections.decrementAndGet(); totalConnections.decrementAndGet();
return !isPoolSuspended; return true;
} }
Connection connection = null; Connection connection = null;
@ -535,6 +537,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (PoolBagEntry bagEntry : connectionBag.values(STATE_IN_USE)) { for (PoolBagEntry bagEntry : connectionBag.values(STATE_IN_USE)) {
try { try {
bagEntry.evicted = true;
bagEntry.connection.abort(assassinExecutor); bagEntry.connection.abort(assassinExecutor);
} }
catch (AbstractMethodError e) { catch (AbstractMethodError e) {
@ -547,13 +550,9 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
quietlyCloseConnection(bagEntry.connection); quietlyCloseConnection(bagEntry.connection);
} }
finally { finally {
try { if (connectionBag.remove(bagEntry)) {
connectionBag.remove(bagEntry);
totalConnections.decrementAndGet(); totalConnections.decrementAndGet();
} }
catch (IllegalStateException ise) {
continue;
}
} }
} }

@ -30,7 +30,7 @@ public final class PoolBagEntry extends BagEntry
public final Connection connection; public final Connection connection;
public final long expirationTime; public final long expirationTime;
public long lastOpenTime; public long lastOpenTime;
volatile boolean evicted; public volatile boolean evicted;
long lastAccess; long lastAccess;
public PoolBagEntry(final Connection connection, long maxLifetime) { public PoolBagEntry(final Connection connection, long maxLifetime) {

@ -202,7 +202,10 @@ public abstract class ConnectionProxy implements IHikariConnectionProxy
delegate.clearWarnings(); delegate.clearWarnings();
} }
catch (SQLException e) { catch (SQLException e) {
throw checkException(e); // when connections are evicted, exceptions are often thrown that should not reach the application
if (!bagEntry.evicted) {
throw checkException(e);
}
} }
finally { finally {
delegate = ClosedConnection.CLOSED_CONNECTION; delegate = ClosedConnection.CLOSED_CONNECTION;

@ -194,17 +194,20 @@ public final class ConcurrentBag<T extends BagEntry>
* with objects obtained by {@link #borrow(long, TimeUnit)} or {@link #reserve(BagEntry)}. * with objects obtained by {@link #borrow(long, TimeUnit)} or {@link #reserve(BagEntry)}.
* *
* @param bagEntry the value to remove * @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object * @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first * from the bag that was not borrowed or reserved first
*/ */
public void remove(final T bagEntry) public boolean remove(final T bagEntry)
{ {
if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) { if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved"); throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved");
} }
else if (!sharedList.remove(bagEntry) && !closed) { final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist"); throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
} }
return removed;
} }
/** /**

@ -307,7 +307,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
long sleepBackoff = 200L; long sleepBackoff = 200L;
final int maxPoolSize = configuration.getMaximumPoolSize(); final int maxPoolSize = configuration.getMaximumPoolSize();
final int minIdle = configuration.getMinimumIdle(); final int minIdle = configuration.getMinimumIdle();
while (!isShutdown && totalConnections.get() < maxPoolSize && (minIdle == 0 || getIdleConnections() < minIdle)) { while (!isPoolSuspended && !isShutdown && totalConnections.get() < maxPoolSize && (minIdle == 0 || getIdleConnections() < minIdle)) {
if (addConnection()) { if (addConnection()) {
if (minIdle == 0) { if (minIdle == 0) {
break; // This break is here so we only add one connection when there is no min. idle break; // This break is here so we only add one connection when there is no min. idle
@ -382,6 +382,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
if (isPoolSuspended) { if (isPoolSuspended) {
acquisitionSemaphore.release(10000); acquisitionSemaphore.release(10000);
isPoolSuspended = false; isPoolSuspended = false;
addBagItem(); // re-populate the pool
} }
} }
@ -396,12 +397,13 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
*/ */
private void closeConnection(final PoolBagEntry bagEntry) private void closeConnection(final PoolBagEntry bagEntry)
{ {
connectionBag.remove(bagEntry); if (connectionBag.remove(bagEntry)) {
final int tc = totalConnections.decrementAndGet(); final int tc = totalConnections.decrementAndGet();
if (tc < 0) { if (tc < 0) {
LOGGER.warn("Internal accounting inconsistency, totalConnections={}", tc, new Exception()); LOGGER.warn("Internal accounting inconsistency, totalConnections={}", tc, new Exception());
}
closeConnectionExecutor.submit(() -> { quietlyCloseConnection(bagEntry.connection); });
} }
closeConnectionExecutor.submit(() -> { quietlyCloseConnection(bagEntry.connection); });
} }
/** /**
@ -412,7 +414,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
// Speculative increment of totalConnections with expectation of success // Speculative increment of totalConnections with expectation of success
if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize() || isShutdown || isPoolSuspended) { if (totalConnections.incrementAndGet() > configuration.getMaximumPoolSize() || isShutdown || isPoolSuspended) {
totalConnections.decrementAndGet(); totalConnections.decrementAndGet();
return !isPoolSuspended; return true;
} }
Connection connection = null; Connection connection = null;
@ -511,19 +513,16 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
connectionBag.values(STATE_IN_USE).stream().forEach(bagEntry -> { connectionBag.values(STATE_IN_USE).stream().forEach(bagEntry -> {
try { try {
bagEntry.evicted = true;
bagEntry.connection.abort(assassinExecutor); bagEntry.connection.abort(assassinExecutor);
} }
catch (SQLException | AbstractMethodError e) { catch (SQLException | AbstractMethodError e) {
quietlyCloseConnection(bagEntry.connection); quietlyCloseConnection(bagEntry.connection);
} }
finally { finally {
try { if (connectionBag.remove(bagEntry)) {
connectionBag.remove(bagEntry);
totalConnections.decrementAndGet(); totalConnections.decrementAndGet();
} }
catch (IllegalStateException ise) {
return;
}
} }
}); });

@ -30,7 +30,7 @@ public final class PoolBagEntry extends BagEntry
public final Connection connection; public final Connection connection;
public final long expirationTime; public final long expirationTime;
public long lastOpenTime; public long lastOpenTime;
volatile boolean evicted; public volatile boolean evicted;
long lastAccess; long lastAccess;
public PoolBagEntry(final Connection connection, long maxLifetime) { public PoolBagEntry(final Connection connection, long maxLifetime) {

@ -202,7 +202,10 @@ public abstract class ConnectionProxy implements IHikariConnectionProxy
delegate.clearWarnings(); delegate.clearWarnings();
} }
catch (SQLException e) { catch (SQLException e) {
throw checkException(e); // when connections are evicted, exceptions are often thrown that should not reach the application
if (!bagEntry.evicted) {
throw checkException(e);
}
} }
finally { finally {
delegate = ClosedConnection.CLOSED_CONNECTION; delegate = ClosedConnection.CLOSED_CONNECTION;

@ -193,17 +193,20 @@ public final class ConcurrentBag<T extends BagEntry>
* with objects obtained by {@link #borrow(long, TimeUnit)} or {@link #reserve(BagEntry)}. * with objects obtained by {@link #borrow(long, TimeUnit)} or {@link #reserve(BagEntry)}.
* *
* @param bagEntry the value to remove * @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object * @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first * from the bag that was not borrowed or reserved first
*/ */
public void remove(final T bagEntry) public boolean remove(final T bagEntry)
{ {
if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) { if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved"); throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved");
} }
else if (!sharedList.remove(bagEntry) && !closed) { final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist"); throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
} }
return removed;
} }
/** /**

Loading…
Cancel
Save