From d71db82715782f7121132aea982478f7b6c77cd1 Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Tue, 18 Nov 2014 23:56:33 +0900 Subject: [PATCH] Fix #198 improve shutdown handling with respect to asynchronous close() calls that might be occurring. --- .../com/zaxxer/hikari/pool/HikariPool.java | 1 + .../com/zaxxer/hikari/util/ConcurrentBag.java | 19 +++--- .../java/com/zaxxer/hikari/ShutdownTest.java | 68 +++++++++++++++++++ .../com/zaxxer/hikari/TestConcurrentBag.java | 5 +- .../com/zaxxer/hikari/pool/HikariPool.java | 1 + .../com/zaxxer/hikari/util/ConcurrentBag.java | 19 +++--- .../java/com/zaxxer/hikari/ShutdownTest.java | 68 +++++++++++++++++++ .../com/zaxxer/hikari/TestConcurrentBag.java | 5 +- 8 files changed, 162 insertions(+), 24 deletions(-) diff --git a/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index a427355b..3a2956be 100644 --- a/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -539,6 +539,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); for (PoolBagEntry bagEntry : connectionBag.values(STATE_IN_USE)) { try { + bagEntry.evicted = true; bagEntry.connection.abort(assassinExecutor); } catch (AbstractMethodError e) { diff --git a/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index 321bc0cd..95192b27 100644 --- a/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -181,11 +181,12 @@ public final class ConcurrentBag public void add(final T bagEntry) { if (closed) { - throw new IllegalStateException("ConcurrentBag has been closed"); + LOGGER.warn("ConcurrentBag has been closed, ignoring add()"); + } + else { + sharedList.add(bagEntry); + synchronizer.releaseShared(sequence.incrementAndGet()); } - - sharedList.add(bagEntry); - synchronizer.releaseShared(sequence.incrementAndGet()); } /** @@ -198,14 +199,12 @@ public final class ConcurrentBag */ public void remove(final T bagEntry) { - if (bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) || bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED)) { - if (!sharedList.remove(bagEntry)) { - throw new IllegalStateException("Attempt to remove an object from the bag that does not exist"); - } - } - else { + if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) { throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved"); } + else if (!sharedList.remove(bagEntry) && !closed) { + throw new IllegalStateException("Attempt to remove an object from the bag that does not exist"); + } } /** diff --git a/hikaricp-java6/src/test/java/com/zaxxer/hikari/ShutdownTest.java b/hikaricp-java6/src/test/java/com/zaxxer/hikari/ShutdownTest.java index 390e88bf..c11434f5 100644 --- a/hikaricp-java6/src/test/java/com/zaxxer/hikari/ShutdownTest.java +++ b/hikaricp-java6/src/test/java/com/zaxxer/hikari/ShutdownTest.java @@ -17,6 +17,7 @@ package com.zaxxer.hikari; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.concurrent.TimeUnit; @@ -233,6 +234,73 @@ public class ShutdownTest } } + @Test + public void testThreadedShutdown() throws Exception + { + HikariConfig config = new HikariConfig(); + config.setMinimumIdle(5); + config.setMaximumPoolSize(5); + config.setConnectionTimeout(200); + config.setInitializationFailFast(true); + config.setConnectionTestQuery("VALUES 1"); + config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource"); + + for (int i = 0; i < 4; i++) { + final HikariDataSource ds = new HikariDataSource(config); + Thread t = new Thread() { + public void run() { + try { + Connection connection = ds.getConnection(); + for (int i = 0; i < 10; i++) { + Connection connection2 = null; + try { + connection2 = ds.getConnection(); + PreparedStatement stmt = connection2.prepareStatement("SOMETHING"); + PoolUtilities.quietlySleep(20); + stmt.getMaxFieldSize(); + } + catch (SQLException e) { + try { + if (connection2 != null) { + connection2.close(); + } + } + catch (SQLException e2) { + if (e2.getMessage().contains("shutdown") || e2.getMessage().contains("evicted")) { + break; + } + } + } + } + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + finally { + ds.shutdown(); + } + }; + }; + t.start(); + + Thread t2 = new Thread() { + public void run() { + PoolUtilities.quietlySleep(100); + try { + ds.shutdown(); + } + catch (IllegalStateException e) { + Assert.fail(e.getMessage()); + } + }; + }; + t2.start(); + + t.join(); + t2.join(); + } + } + private int threadCount() { Thread[] threads = new Thread[Thread.activeCount() * 2]; diff --git a/hikaricp-java6/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java b/hikaricp-java6/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java index a2f3aa1a..8642656c 100644 --- a/hikaricp-java6/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java +++ b/hikaricp-java6/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java @@ -84,8 +84,9 @@ public class TestConcurrentBag bag.close(); try { - bag.add(new PoolBagEntry(null, 0)); - Assert.fail(); + PoolBagEntry bagEntry = new PoolBagEntry(null, 0); + bag.add(bagEntry); + Assert.assertNotEquals(bagEntry, bag.borrow(100, TimeUnit.MILLISECONDS)); } catch (IllegalStateException e) { // pass diff --git a/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index d0f68111..c6941f7f 100644 --- a/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -515,6 +515,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); connectionBag.values(STATE_IN_USE).stream().forEach(bagEntry -> { try { + bagEntry.evicted = true; bagEntry.connection.abort(assassinExecutor); } catch (SQLException | AbstractMethodError e) { diff --git a/hikaricp/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/hikaricp/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index 9d691364..01ecba53 100644 --- a/hikaricp/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/hikaricp/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -180,11 +180,12 @@ public final class ConcurrentBag public void add(final T bagEntry) { if (closed) { - throw new IllegalStateException("ConcurrentBag has been closed"); + LOGGER.warn("ConcurrentBag has been closed, ignoring add()"); + } + else { + sharedList.add(bagEntry); + synchronizer.releaseShared(sequence.incrementAndGet()); } - - sharedList.add(bagEntry); - synchronizer.releaseShared(sequence.incrementAndGet()); } /** @@ -197,14 +198,12 @@ public final class ConcurrentBag */ public void remove(final T bagEntry) { - if (bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) || bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED)) { - if (!sharedList.remove(bagEntry)) { - throw new IllegalStateException("Attempt to remove an object from the bag that does not exist"); - } - } - else { + if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) { throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved"); } + else if (!sharedList.remove(bagEntry) && !closed) { + throw new IllegalStateException("Attempt to remove an object from the bag that does not exist"); + } } /** diff --git a/hikaricp/src/test/java/com/zaxxer/hikari/ShutdownTest.java b/hikaricp/src/test/java/com/zaxxer/hikari/ShutdownTest.java index 390e88bf..c11434f5 100644 --- a/hikaricp/src/test/java/com/zaxxer/hikari/ShutdownTest.java +++ b/hikaricp/src/test/java/com/zaxxer/hikari/ShutdownTest.java @@ -17,6 +17,7 @@ package com.zaxxer.hikari; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.concurrent.TimeUnit; @@ -233,6 +234,73 @@ public class ShutdownTest } } + @Test + public void testThreadedShutdown() throws Exception + { + HikariConfig config = new HikariConfig(); + config.setMinimumIdle(5); + config.setMaximumPoolSize(5); + config.setConnectionTimeout(200); + config.setInitializationFailFast(true); + config.setConnectionTestQuery("VALUES 1"); + config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource"); + + for (int i = 0; i < 4; i++) { + final HikariDataSource ds = new HikariDataSource(config); + Thread t = new Thread() { + public void run() { + try { + Connection connection = ds.getConnection(); + for (int i = 0; i < 10; i++) { + Connection connection2 = null; + try { + connection2 = ds.getConnection(); + PreparedStatement stmt = connection2.prepareStatement("SOMETHING"); + PoolUtilities.quietlySleep(20); + stmt.getMaxFieldSize(); + } + catch (SQLException e) { + try { + if (connection2 != null) { + connection2.close(); + } + } + catch (SQLException e2) { + if (e2.getMessage().contains("shutdown") || e2.getMessage().contains("evicted")) { + break; + } + } + } + } + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + finally { + ds.shutdown(); + } + }; + }; + t.start(); + + Thread t2 = new Thread() { + public void run() { + PoolUtilities.quietlySleep(100); + try { + ds.shutdown(); + } + catch (IllegalStateException e) { + Assert.fail(e.getMessage()); + } + }; + }; + t2.start(); + + t.join(); + t2.join(); + } + } + private int threadCount() { Thread[] threads = new Thread[Thread.activeCount() * 2]; diff --git a/hikaricp/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java b/hikaricp/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java index a2f3aa1a..8642656c 100644 --- a/hikaricp/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java +++ b/hikaricp/src/test/java/com/zaxxer/hikari/TestConcurrentBag.java @@ -84,8 +84,9 @@ public class TestConcurrentBag bag.close(); try { - bag.add(new PoolBagEntry(null, 0)); - Assert.fail(); + PoolBagEntry bagEntry = new PoolBagEntry(null, 0); + bag.add(bagEntry); + Assert.assertNotEquals(bagEntry, bag.borrow(100, TimeUnit.MILLISECONDS)); } catch (IllegalStateException e) { // pass