From 43910e61dfee9fd69e8b82208c9f76bc05ecfe84 Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Fri, 17 Apr 2015 11:58:24 +0900 Subject: [PATCH] A race condition was observed under high load when a lot of connections expire at the same time, causing the pool to spike unnecessarily high. --- .../zaxxer/hikari/pool/BaseHikariPool.java | 30 +++++++++++++++++++ .../com/zaxxer/hikari/util/ConcurrentBag.java | 10 +++++-- .../zaxxer/hikari/util/IBagStateListener.java | 4 ++- .../com/zaxxer/hikari/pool/HikariPool.java | 26 ---------------- .../hikari/util/Java6ConcurrentBag.java | 10 +++---- .../com/zaxxer/hikari/pool/HikariPool.java | 20 ------------- .../hikari/util/Java8ConcurrentBag.java | 10 +++---- 7 files changed, 50 insertions(+), 60 deletions(-) diff --git a/hikaricp-common/src/main/java/com/zaxxer/hikari/pool/BaseHikariPool.java b/hikaricp-common/src/main/java/com/zaxxer/hikari/pool/BaseHikariPool.java index e0e97cd1..633ac6b7 100644 --- a/hikaricp-common/src/main/java/com/zaxxer/hikari/pool/BaseHikariPool.java +++ b/hikaricp-common/src/main/java/com/zaxxer/hikari/pool/BaseHikariPool.java @@ -25,12 +25,15 @@ import static com.zaxxer.hikari.util.UtilityElf.createInstance; import static com.zaxxer.hikari.util.UtilityElf.createThreadPoolExecutor; import static com.zaxxer.hikari.util.UtilityElf.elapsedTimeMs; import static com.zaxxer.hikari.util.UtilityElf.getTransactionIsolation; +import static com.zaxxer.hikari.util.UtilityElf.quietlySleep; import static com.zaxxer.hikari.util.UtilityElf.setRemoveOnCancelPolicy; import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -393,6 +396,32 @@ public abstract class BaseHikariPool implements HikariPoolMBean, IBagStateListen } } + // *********************************************************************** + // IBagStateListener callback + // *********************************************************************** + + /** {@inheritDoc} */ + @Override + public Future addBagItem() + { + FutureTask future = new FutureTask(new Runnable() { + public void run() + { + long sleepBackoff = 200L; + final int minimumIdle = configuration.getMinimumIdle(); + final int maxPoolSize = configuration.getMaximumPoolSize(); + while (poolState == POOL_RUNNING && totalConnections.get() < maxPoolSize && getIdleConnections() <= minimumIdle && !addConnection()) { + // If we got into the loop, addConnection() failed, so we sleep and retry + quietlySleep(sleepBackoff); + sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5)); + } + } + }, true); + + addConnectionExecutor.execute(future); + return future; + } + // *********************************************************************** // Protected methods // *********************************************************************** @@ -535,6 +564,7 @@ public abstract class BaseHikariPool implements HikariPoolMBean, IBagStateListen * * @return an IConnectionCustomizer instance */ + @SuppressWarnings("deprecation") private IConnectionCustomizer initializeCustomizer() { if (configuration.getConnectionCustomizerClassName() != null) { diff --git a/hikaricp-common/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/hikaricp-common/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index 211aee7b..951aac57 100644 --- a/hikaricp-common/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/hikaricp-common/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -24,6 +24,7 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; @@ -57,9 +58,9 @@ public class ConcurrentBag protected final AbstractQueuedLongSynchronizer synchronizer; protected final CopyOnWriteArrayList sharedList; + protected final AtomicLong sequence; private final ThreadLocal>> threadList; - private final AtomicLong sequence; private final IBagStateListener listener; private volatile boolean closed; @@ -110,6 +111,7 @@ public class ConcurrentBag // Otherwise, scan the shared list ... for maximum of timeout timeout = timeUnit.toNanos(timeout); + Future addItemFuture = null; final long startScan = System.nanoTime(); final long originTimeout = timeout; do { @@ -122,8 +124,10 @@ public class ConcurrentBag } } } while (startSeq < sequence.get()); - - listener.addBagItem(); + + if (addItemFuture == null || addItemFuture.isDone()) { + addItemFuture = listener.addBagItem(); + } if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { return null; diff --git a/hikaricp-common/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java b/hikaricp-common/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java index 24faf2d3..7be62a28 100644 --- a/hikaricp-common/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java +++ b/hikaricp-common/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java @@ -15,6 +15,8 @@ */ package com.zaxxer.hikari.util; +import java.util.concurrent.Future; + /** * This interface is implemented by a listener to the ConcurrentBag. The * listener will be informed of when the bag has become empty. The usual @@ -25,5 +27,5 @@ package com.zaxxer.hikari.util; */ public interface IBagStateListener { - void addBagItem(); + Future addBagItem(); } diff --git a/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index 0615bfee..289d994a 100644 --- a/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/hikaricp-java6/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -18,7 +18,6 @@ package com.zaxxer.hikari.pool; import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_IN_USE; import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_NOT_IN_USE; -import static com.zaxxer.hikari.util.UtilityElf.quietlySleep; import java.sql.Connection; import java.sql.SQLException; @@ -61,31 +60,6 @@ public final class HikariPool extends BaseHikariPool super(configuration, username, password); } - // *********************************************************************** - // IBagStateListener callback - // *********************************************************************** - - /** {@inheritDoc} */ - @Override - public void addBagItem() - { - class AddConnection implements Runnable - { - public void run() - { - long sleepBackoff = 200L; - final int maxPoolSize = configuration.getMaximumPoolSize(); - while (poolState == POOL_RUNNING && totalConnections.get() < maxPoolSize && !addConnection()) { - // If we got into the loop, addConnection() failed, so we sleep and retry - quietlySleep(sleepBackoff); - sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5)); - } - } - } - - addConnectionExecutor.execute(new AddConnection()); - } - // *********************************************************************** // HikariPoolMBean methods // *********************************************************************** diff --git a/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/Java6ConcurrentBag.java b/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/Java6ConcurrentBag.java index 74de51f3..b5c8ccf7 100644 --- a/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/Java6ConcurrentBag.java +++ b/hikaricp-java6/src/main/java/com/zaxxer/hikari/util/Java6ConcurrentBag.java @@ -101,21 +101,21 @@ public final class Java6ConcurrentBag extends ConcurrentBag /** * Our private synchronizer that handles notify/wait type semantics. */ - private static final class Synchronizer extends AbstractQueuedLongSynchronizer + private final class Synchronizer extends AbstractQueuedLongSynchronizer { private static final long serialVersionUID = 104753538004341218L; @Override - protected long tryAcquireShared(long seq) + protected long tryAcquireShared(final long seq) { - return getState() > seq && !java67hasQueuedPredecessors() ? 1L : -1L; + return java67hasQueuedPredecessors() ? -1L : getState() - (seq + 1); } /** {@inheritDoc} */ @Override - protected boolean tryReleaseShared(long updateSeq) + protected boolean tryReleaseShared(final long ignored) { - setState(updateSeq); + setState(sequence.get()); return true; } diff --git a/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index a689d48e..19e26a4d 100644 --- a/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -18,7 +18,6 @@ package com.zaxxer.hikari.pool; import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_IN_USE; import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_NOT_IN_USE; -import static com.zaxxer.hikari.util.UtilityElf.quietlySleep; import java.sql.Connection; import java.sql.SQLException; @@ -61,25 +60,6 @@ public final class HikariPool extends BaseHikariPool super(configuration, username, password); } - // *********************************************************************** - // IBagStateListener callback - // *********************************************************************** - - /** {@inheritDoc} */ - @Override - public void addBagItem() - { - addConnectionExecutor.execute( () -> { - long sleepBackoff = 200L; - final int maxPoolSize = configuration.getMaximumPoolSize(); - while (poolState == POOL_RUNNING && totalConnections.get() < maxPoolSize && !addConnection()) { - // If we got into the loop, addConnection() failed, so we sleep and retry - quietlySleep(sleepBackoff); - sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5)); - } - }); - } - // *********************************************************************** // HikariPoolMBean methods // *********************************************************************** diff --git a/hikaricp/src/main/java/com/zaxxer/hikari/util/Java8ConcurrentBag.java b/hikaricp/src/main/java/com/zaxxer/hikari/util/Java8ConcurrentBag.java index 19456c0c..338acafd 100644 --- a/hikaricp/src/main/java/com/zaxxer/hikari/util/Java8ConcurrentBag.java +++ b/hikaricp/src/main/java/com/zaxxer/hikari/util/Java8ConcurrentBag.java @@ -97,21 +97,21 @@ public final class Java8ConcurrentBag extends ConcurrentBag /** * Our private synchronizer that handles notify/wait type semantics. */ - private static final class Synchronizer extends AbstractQueuedLongSynchronizer + private final class Synchronizer extends AbstractQueuedLongSynchronizer { private static final long serialVersionUID = 104753538004341218L; @Override - protected long tryAcquireShared(long seq) + protected long tryAcquireShared(final long seq) { - return getState() > seq && !hasQueuedPredecessors() ? 1L : -1L; + return hasQueuedPredecessors() ? -1L : getState() - (seq + 1); } /** {@inheritDoc} */ @Override - protected boolean tryReleaseShared(long updateSeq) + protected boolean tryReleaseShared(final long ignored) { - setState(updateSeq); + setState(sequence.get()); return true; }