Change rejected handler policy for closeConnectionExecutor to CallRunsPolicy.

pull/192/head
Brett Wooldridge 11 years ago
parent 16dbc953d1
commit b140c9ed68

@ -53,10 +53,8 @@ import com.zaxxer.hikari.proxy.ProxyFactory;
import com.zaxxer.hikari.util.ConcurrentBag; import com.zaxxer.hikari.util.ConcurrentBag;
import com.zaxxer.hikari.util.ConcurrentBag.IBagStateListener; import com.zaxxer.hikari.util.ConcurrentBag.IBagStateListener;
import com.zaxxer.hikari.util.DefaultThreadFactory; import com.zaxxer.hikari.util.DefaultThreadFactory;
import com.zaxxer.hikari.util.DriverDataSource;
import com.zaxxer.hikari.util.LeakTask; import com.zaxxer.hikari.util.LeakTask;
import com.zaxxer.hikari.util.PoolUtilities; import com.zaxxer.hikari.util.PoolUtilities;
import com.zaxxer.hikari.util.PropertyBeanSetter;
/** /**
* This is the primary connection pool class that provides the basic * This is the primary connection pool class that provides the basic
@ -153,8 +151,8 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
HikariMBeanElf.registerMBeans(configuration, this); HikariMBeanElf.registerMBeans(configuration, this);
} }
addConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection filler (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory()); addConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection filler (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
closeConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection closer (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory()); closeConnectionExecutor = createThreadPoolExecutor(4, "HikariCP connection closer (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
fillPool(); fillPool();
@ -525,7 +523,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
*/ */
private void abortActiveConnections() throws InterruptedException private void abortActiveConnections() throws InterruptedException
{ {
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory()); ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
for (PoolBagEntry bagEntry : connectionBag.values(STATE_IN_USE)) { for (PoolBagEntry bagEntry : connectionBag.values(STATE_IN_USE)) {
try { try {
bagEntry.connection.abort(assassinExecutor); bagEntry.connection.abort(assassinExecutor);

@ -7,6 +7,7 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -157,7 +158,7 @@ public final class PoolUtilities
* @param threadFactory an optional ThreadFactory * @param threadFactory an optional ThreadFactory
* @return a ThreadPoolExecutor * @return a ThreadPoolExecutor
*/ */
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory) public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, RejectedExecutionHandler policy)
{ {
if (threadFactory == null) { if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true); threadFactory = new DefaultThreadFactory(threadName, true);
@ -165,8 +166,7 @@ public final class PoolUtilities
int processors = Math.max(1, Runtime.getRuntime().availableProcessors() / 4); int processors = Math.max(1, Runtime.getRuntime().availableProcessors() / 4);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize); LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 5, TimeUnit.SECONDS, queue, threadFactory, ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 5, TimeUnit.SECONDS, queue, threadFactory, policy);
new ThreadPoolExecutor.DiscardPolicy());
executor.allowCoreThreadTimeOut(true); executor.allowCoreThreadTimeOut(true);
return executor; return executor;
} }

@ -150,8 +150,8 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
HikariMBeanElf.registerMBeans(configuration, this); HikariMBeanElf.registerMBeans(configuration, this);
} }
addConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection filler (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory()); addConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection filler (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
closeConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection closer (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory()); closeConnectionExecutor = createThreadPoolExecutor(4, "HikariCP connection closer (pool " + configuration.getPoolName() + ")", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
fillPool(); fillPool();
@ -508,7 +508,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener
*/ */
private void abortActiveConnections() throws InterruptedException private void abortActiveConnections() throws InterruptedException
{ {
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory()); ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
connectionBag.values(STATE_IN_USE).parallelStream().forEach(bagEntry -> { connectionBag.values(STATE_IN_USE).parallelStream().forEach(bagEntry -> {
try { try {
bagEntry.connection.abort(assassinExecutor); bagEntry.connection.abort(assassinExecutor);

@ -7,6 +7,7 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -144,7 +145,7 @@ public final class PoolUtilities
* @param threadFactory an optional ThreadFactory * @param threadFactory an optional ThreadFactory
* @return a ThreadPoolExecutor * @return a ThreadPoolExecutor
*/ */
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory) public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, RejectedExecutionHandler policy)
{ {
if (threadFactory == null) { if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true); threadFactory = new DefaultThreadFactory(threadName, true);
@ -152,8 +153,7 @@ public final class PoolUtilities
int processors = Math.max(1, Runtime.getRuntime().availableProcessors() / 4); int processors = Math.max(1, Runtime.getRuntime().availableProcessors() / 4);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize); LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 5, TimeUnit.SECONDS, queue, threadFactory, ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 5, TimeUnit.SECONDS, queue, threadFactory, policy);
new ThreadPoolExecutor.DiscardPolicy());
executor.allowCoreThreadTimeOut(true); executor.allowCoreThreadTimeOut(true);
return executor; return executor;
} }

Loading…
Cancel
Save