Fix #198 improve shutdown handling with respect to asynchronous close() calls that might be occurring.

pull/201/head
Brett Wooldridge 10 years ago
parent d097828d9d
commit d71db82715

@ -539,6 +539,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (PoolBagEntry bagEntry : connectionBag.values(STATE_IN_USE)) {
try {
bagEntry.evicted = true;
bagEntry.connection.abort(assassinExecutor);
}
catch (AbstractMethodError e) {

@ -181,11 +181,12 @@ public final class ConcurrentBag<T extends BagEntry>
public void add(final T bagEntry)
{
if (closed) {
throw new IllegalStateException("ConcurrentBag has been closed");
LOGGER.warn("ConcurrentBag has been closed, ignoring add()");
}
else {
sharedList.add(bagEntry);
synchronizer.releaseShared(sequence.incrementAndGet());
}
sharedList.add(bagEntry);
synchronizer.releaseShared(sequence.incrementAndGet());
}
/**
@ -198,14 +199,12 @@ public final class ConcurrentBag<T extends BagEntry>
*/
public void remove(final T bagEntry)
{
if (bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) || bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED)) {
if (!sharedList.remove(bagEntry)) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
}
}
else {
if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved");
}
else if (!sharedList.remove(bagEntry) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
}
}
/**

@ -17,6 +17,7 @@
package com.zaxxer.hikari;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@ -233,6 +234,73 @@ public class ShutdownTest
}
}
@Test
public void testThreadedShutdown() throws Exception
{
HikariConfig config = new HikariConfig();
config.setMinimumIdle(5);
config.setMaximumPoolSize(5);
config.setConnectionTimeout(200);
config.setInitializationFailFast(true);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
for (int i = 0; i < 4; i++) {
final HikariDataSource ds = new HikariDataSource(config);
Thread t = new Thread() {
public void run() {
try {
Connection connection = ds.getConnection();
for (int i = 0; i < 10; i++) {
Connection connection2 = null;
try {
connection2 = ds.getConnection();
PreparedStatement stmt = connection2.prepareStatement("SOMETHING");
PoolUtilities.quietlySleep(20);
stmt.getMaxFieldSize();
}
catch (SQLException e) {
try {
if (connection2 != null) {
connection2.close();
}
}
catch (SQLException e2) {
if (e2.getMessage().contains("shutdown") || e2.getMessage().contains("evicted")) {
break;
}
}
}
}
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
finally {
ds.shutdown();
}
};
};
t.start();
Thread t2 = new Thread() {
public void run() {
PoolUtilities.quietlySleep(100);
try {
ds.shutdown();
}
catch (IllegalStateException e) {
Assert.fail(e.getMessage());
}
};
};
t2.start();
t.join();
t2.join();
}
}
private int threadCount()
{
Thread[] threads = new Thread[Thread.activeCount() * 2];

@ -84,8 +84,9 @@ public class TestConcurrentBag
bag.close();
try {
bag.add(new PoolBagEntry(null, 0));
Assert.fail();
PoolBagEntry bagEntry = new PoolBagEntry(null, 0);
bag.add(bagEntry);
Assert.assertNotEquals(bagEntry, bag.borrow(100, TimeUnit.MILLISECONDS));
}
catch (IllegalStateException e) {
// pass

@ -515,6 +515,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
connectionBag.values(STATE_IN_USE).stream().forEach(bagEntry -> {
try {
bagEntry.evicted = true;
bagEntry.connection.abort(assassinExecutor);
}
catch (SQLException | AbstractMethodError e) {

@ -180,11 +180,12 @@ public final class ConcurrentBag<T extends BagEntry>
public void add(final T bagEntry)
{
if (closed) {
throw new IllegalStateException("ConcurrentBag has been closed");
LOGGER.warn("ConcurrentBag has been closed, ignoring add()");
}
else {
sharedList.add(bagEntry);
synchronizer.releaseShared(sequence.incrementAndGet());
}
sharedList.add(bagEntry);
synchronizer.releaseShared(sequence.incrementAndGet());
}
/**
@ -197,14 +198,12 @@ public final class ConcurrentBag<T extends BagEntry>
*/
public void remove(final T bagEntry)
{
if (bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) || bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED)) {
if (!sharedList.remove(bagEntry)) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
}
}
else {
if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved");
}
else if (!sharedList.remove(bagEntry) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
}
}
/**

@ -17,6 +17,7 @@
package com.zaxxer.hikari;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@ -233,6 +234,73 @@ public class ShutdownTest
}
}
@Test
public void testThreadedShutdown() throws Exception
{
HikariConfig config = new HikariConfig();
config.setMinimumIdle(5);
config.setMaximumPoolSize(5);
config.setConnectionTimeout(200);
config.setInitializationFailFast(true);
config.setConnectionTestQuery("VALUES 1");
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
for (int i = 0; i < 4; i++) {
final HikariDataSource ds = new HikariDataSource(config);
Thread t = new Thread() {
public void run() {
try {
Connection connection = ds.getConnection();
for (int i = 0; i < 10; i++) {
Connection connection2 = null;
try {
connection2 = ds.getConnection();
PreparedStatement stmt = connection2.prepareStatement("SOMETHING");
PoolUtilities.quietlySleep(20);
stmt.getMaxFieldSize();
}
catch (SQLException e) {
try {
if (connection2 != null) {
connection2.close();
}
}
catch (SQLException e2) {
if (e2.getMessage().contains("shutdown") || e2.getMessage().contains("evicted")) {
break;
}
}
}
}
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
finally {
ds.shutdown();
}
};
};
t.start();
Thread t2 = new Thread() {
public void run() {
PoolUtilities.quietlySleep(100);
try {
ds.shutdown();
}
catch (IllegalStateException e) {
Assert.fail(e.getMessage());
}
};
};
t2.start();
t.join();
t2.join();
}
}
private int threadCount()
{
Thread[] threads = new Thread[Thread.activeCount() * 2];

@ -84,8 +84,9 @@ public class TestConcurrentBag
bag.close();
try {
bag.add(new PoolBagEntry(null, 0));
Assert.fail();
PoolBagEntry bagEntry = new PoolBagEntry(null, 0);
bag.add(bagEntry);
Assert.assertNotEquals(bagEntry, bag.borrow(100, TimeUnit.MILLISECONDS));
}
catch (IllegalStateException e) {
// pass

Loading…
Cancel
Save