From 7f50428effbbfb3cffaccc22bc853b2cc01c4092 Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Wed, 15 Apr 2015 18:20:33 +0900 Subject: [PATCH 1/5] merge --- .../com/zaxxer/hikari/util/ConcurrentBag.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index fecc90ee..a1545db1 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -113,17 +113,21 @@ public class ConcurrentBag startSeq = sequence.get(); for (final T bagEntry : sharedList) { if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { + LOGGER.debug("{} got bag item {}", Thread.currentThread(), startSeq); return bagEntry; } } } while (startSeq < sequence.get()); - + + LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); listener.addBagItem(); if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { return null; } + LOGGER.debug("{} woke up to try again", Thread.currentThread()); + final long elapsed = (System.nanoTime() - startScan); timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments } @@ -311,18 +315,38 @@ public class ConcurrentBag private static final long serialVersionUID = 104753538004341218L; @Override - protected long tryAcquireShared(long seq) + protected long tryAcquireShared(final long seq) { - return getState() > seq && !hasQueuedPredecessors() ? 1L : -1L; + if (hasQueuedPredecessors()) { + LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); + return -1L; + } + + final long ret = getState() > seq ? 0L : -1L; + LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); + return ret; } /** {@inheritDoc} */ @Override - protected boolean tryReleaseShared(long updateSeq) + protected boolean tryReleaseShared(final long updateSeq) { - setState(updateSeq); + long currentSeq = getState(); + while (updateSeq > currentSeq && !compareAndSetState(currentSeq, updateSeq)) { + // spin + } + + LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); return true; +// final long currentSeq = getState(); +// if (updateSeq > currentSeq && compareAndSetState(currentSeq, updateSeq)) { +// LOGGER.debug("tryReleaseShared({}) returned 'true'", updateSeq); +// return true; +// } +// +// LOGGER.debug("tryReleaseShared({}) returned 'false'", updateSeq); +// return false; } } } From 72b92390bf86793575ec6bbec7da70211fa0ae2f Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Wed, 15 Apr 2015 19:21:42 +0900 Subject: [PATCH 2/5] Experimental concurrentbag fairness changes. Starvation was observed under high load. --- .../com/zaxxer/hikari/util/ConcurrentBag.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index a1545db1..bc7a3136 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -89,16 +89,19 @@ public class ConcurrentBag @SuppressWarnings("unchecked") public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException { - // Try the thread-local list first - final ArrayList> list = threadList.get(); - if (list == null) { - threadList.set(new ArrayList>(16)); - } - else { - for (int i = list.size() - 1; i >= 0; i--) { - final IConcurrentBagEntry bagEntry = list.remove(i).get(); - if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - return (T) bagEntry; + // Try the thread-local list first if nobody is queued + if (!synchronizer.hasQueuedThreads()) { + final ArrayList> list = threadList.get(); + if (list == null) { + threadList.set(new ArrayList>(16)); + } + else { + for (int i = list.size() - 1; i >= 0; i--) { + final IConcurrentBagEntry bagEntry = list.remove(i).get(); + if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { + LOGGER.debug("{} fastpath bag item", Thread.currentThread()); + return (T) bagEntry; + } } } } @@ -107,6 +110,7 @@ public class ConcurrentBag timeout = timeUnit.toNanos(timeout); final long startScan = System.nanoTime(); final long originTimeout = timeout; + final long claimedSeq = sequence.get(); do { long startSeq; do { @@ -122,7 +126,7 @@ public class ConcurrentBag LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); listener.addBagItem(); - if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { + if (!synchronizer.tryAcquireSharedNanos(claimedSeq, timeout)) { return null; } @@ -334,6 +338,7 @@ public class ConcurrentBag long currentSeq = getState(); while (updateSeq > currentSeq && !compareAndSetState(currentSeq, updateSeq)) { // spin + currentSeq = getState(); } LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); From b24cc00f05b0197817b3f21e967423fb3395ad3e Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Thu, 16 Apr 2015 00:50:43 +0900 Subject: [PATCH 3/5] Improvements. Performance still off of where it needs to be in unconstrained pools. --- .../com/zaxxer/hikari/pool/HikariPool.java | 15 ++++--- .../com/zaxxer/hikari/pool/PoolBagEntry.java | 4 +- .../com/zaxxer/hikari/util/ConcurrentBag.java | 41 ++++++++----------- .../zaxxer/hikari/util/IBagStateListener.java | 4 +- .../java/com/zaxxer/hikari/PostgresTest.java | 18 ++++---- 5 files changed, 42 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index b982e0c1..8787e48e 100644 --- a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -31,6 +31,8 @@ import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.sql.Statement; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -347,23 +349,24 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener /** {@inheritDoc} */ @Override - public void addBagItem() + public Future addBagItem() { - class AddConnection implements Runnable - { + FutureTask future = new FutureTask<>(new Runnable() { public void run() { long sleepBackoff = 200L; + final int minimumIdle = configuration.getMinimumIdle(); final int maxPoolSize = configuration.getMaximumPoolSize(); - while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && !addConnection()) { + while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && getIdleConnections() <= minimumIdle && !addConnection()) { // If we got into the loop, addConnection() failed, so we sleep and retry quietlySleep(sleepBackoff); sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5)); } } - } + }, true); - addConnectionExecutor.execute(new AddConnection()); + addConnectionExecutor.execute(future); + return future; } // *********************************************************************** diff --git a/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java b/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java index 40d5d3a7..fd6fb5a6 100644 --- a/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java +++ b/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java @@ -17,6 +17,7 @@ package com.zaxxer.hikari.pool; import java.sql.Connection; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -44,7 +45,8 @@ public final class PoolBagEntry implements IConcurrentBagEntry this.connection = connection; this.lastAccess = System.currentTimeMillis(); - final long maxLifetime = pool.configuration.getMaxLifetime(); + final long variance = pool.configuration.getMaxLifetime() > 300_000 ? ThreadLocalRandom.current().nextLong(100_000) : 0; + final long maxLifetime = pool.configuration.getMaxLifetime() - variance; if (maxLifetime > 0) { endOfLife = pool.houseKeepingExecutorService.schedule(new Runnable() { public void run() diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index bc7a3136..324687c7 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -24,6 +24,7 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; @@ -99,7 +100,7 @@ public class ConcurrentBag for (int i = list.size() - 1; i >= 0; i--) { final IConcurrentBagEntry bagEntry = list.remove(i).get(); if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - LOGGER.debug("{} fastpath bag item", Thread.currentThread()); + // LOGGER.debug("{} fastpath bag item", Thread.currentThread()); return (T) bagEntry; } } @@ -108,9 +109,9 @@ public class ConcurrentBag // Otherwise, scan the shared list ... for maximum of timeout timeout = timeUnit.toNanos(timeout); + Future addItemFuture = null; final long startScan = System.nanoTime(); final long originTimeout = timeout; - final long claimedSeq = sequence.get(); do { long startSeq; do { @@ -123,14 +124,16 @@ public class ConcurrentBag } } while (startSeq < sequence.get()); - LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); - listener.addBagItem(); + if (addItemFuture == null || addItemFuture.isDone()) { + // LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); + addItemFuture = listener.addBagItem(); + } - if (!synchronizer.tryAcquireSharedNanos(claimedSeq, timeout)) { + if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { return null; } - LOGGER.debug("{} woke up to try again", Thread.currentThread()); + // LOGGER.debug("{} woke up to try again", Thread.currentThread()); final long elapsed = (System.nanoTime() - startScan); timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments @@ -321,14 +324,14 @@ public class ConcurrentBag @Override protected long tryAcquireShared(final long seq) { - if (hasQueuedPredecessors()) { - LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); - return -1L; - } - - final long ret = getState() > seq ? 0L : -1L; - LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); - return ret; + // if (hasQueuedPredecessors()) { + // LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); + // return -1L; + //} + + // final long ret = getState() > seq ? 0L : -1L; + // LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); + return getState() <= seq || hasQueuedPredecessors() ? -1L : 0L; } /** {@inheritDoc} */ @@ -341,17 +344,9 @@ public class ConcurrentBag currentSeq = getState(); } - LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); + // LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); return true; -// final long currentSeq = getState(); -// if (updateSeq > currentSeq && compareAndSetState(currentSeq, updateSeq)) { -// LOGGER.debug("tryReleaseShared({}) returned 'true'", updateSeq); -// return true; -// } -// -// LOGGER.debug("tryReleaseShared({}) returned 'false'", updateSeq); -// return false; } } } diff --git a/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java b/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java index 24faf2d3..7be62a28 100644 --- a/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java +++ b/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java @@ -15,6 +15,8 @@ */ package com.zaxxer.hikari.util; +import java.util.concurrent.Future; + /** * This interface is implemented by a listener to the ConcurrentBag. The * listener will be informed of when the bag has become empty. The usual @@ -25,5 +27,5 @@ package com.zaxxer.hikari.util; */ public interface IBagStateListener { - void addBagItem(); + Future addBagItem(); } diff --git a/src/test/java/com/zaxxer/hikari/PostgresTest.java b/src/test/java/com/zaxxer/hikari/PostgresTest.java index e8338c20..9de5d4cf 100644 --- a/src/test/java/com/zaxxer/hikari/PostgresTest.java +++ b/src/test/java/com/zaxxer/hikari/PostgresTest.java @@ -153,15 +153,15 @@ public class PostgresTest } } -// @Test + // @Test public void testCase4() throws Exception { HikariConfig config = new HikariConfig(); config.setMinimumIdle(0); - config.setMaximumPoolSize(15); + config.setMaximumPoolSize(50); config.setConnectionTimeout(10000); - config.setIdleTimeout(TimeUnit.MINUTES.toMillis(2)); - config.setMaxLifetime(TimeUnit.MINUTES.toMillis(6)); + config.setIdleTimeout(TimeUnit.MINUTES.toMillis(1)); + config.setMaxLifetime(TimeUnit.MINUTES.toMillis(2)); config.setRegisterMbeans(true); config.setJdbcUrl("jdbc:postgresql://localhost:5432/netld"); @@ -178,16 +178,16 @@ public class PostgresTest final long start = System.currentTimeMillis(); do { try (Connection conn = ds.getConnection(); Statement stmt = conn.createStatement()) { - try (ResultSet rs = stmt.executeQuery("SELECT * FROM device")) { + final double sleep = Math.random() * 1.0; + try (ResultSet rs = stmt.executeQuery("SELECT pg_sleep(" + sleep + ")")) { rs.next(); } - UtilityElf.quietlySleep(Math.max(200L, (long)(Math.random() * 2500L))); } catch (SQLException e) { - throw new RuntimeException(e); + e.printStackTrace(); } - - } while (UtilityElf.elapsedTimeMs(start) < TimeUnit.MINUTES.toMillis(42)); + // UtilityElf.quietlySleep((long)(Math.random() * 500L)); + } while (UtilityElf.elapsedTimeMs(start) < TimeUnit.MINUTES.toMillis(4)); }; }); } From dc91fcc1b94f815f4ed9e113c222591a5ae2fa88 Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Thu, 16 Apr 2015 22:54:33 +0900 Subject: [PATCH 4/5] More concurrency tests. --- .../java/com/zaxxer/hikari/util/ConcurrentBag.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index 324687c7..f181da22 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -331,7 +331,8 @@ public class ConcurrentBag // final long ret = getState() > seq ? 0L : -1L; // LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); - return getState() <= seq || hasQueuedPredecessors() ? -1L : 0L; + final long delta = getState() - (seq + 1); + return hasQueuedPredecessors() ? -1L : delta; } /** {@inheritDoc} */ @@ -339,14 +340,19 @@ public class ConcurrentBag protected boolean tryReleaseShared(final long updateSeq) { long currentSeq = getState(); - while (updateSeq > currentSeq && !compareAndSetState(currentSeq, updateSeq)) { + while (updateSeq > currentSeq) { + if (compareAndSetState(currentSeq, updateSeq)) { + return true; + } + // spin + Thread.yield(); currentSeq = getState(); } // LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); - return true; + return false; } } } From 2f5ea9073b3783d0380ed3bc574638daf0dec59b Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Thu, 16 Apr 2015 23:41:13 +0900 Subject: [PATCH 5/5] Better. --- .../com/zaxxer/hikari/util/ConcurrentBag.java | 36 ++++--------------- 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index f181da22..73a78997 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -100,7 +100,6 @@ public class ConcurrentBag for (int i = list.size() - 1; i >= 0; i--) { final IConcurrentBagEntry bagEntry = list.remove(i).get(); if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - // LOGGER.debug("{} fastpath bag item", Thread.currentThread()); return (T) bagEntry; } } @@ -118,14 +117,13 @@ public class ConcurrentBag startSeq = sequence.get(); for (final T bagEntry : sharedList) { if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - LOGGER.debug("{} got bag item {}", Thread.currentThread(), startSeq); + // LOGGER.debug("{} got bag item {}", Thread.currentThread(), startSeq); return bagEntry; } } } while (startSeq < sequence.get()); if (addItemFuture == null || addItemFuture.isDone()) { - // LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); addItemFuture = listener.addBagItem(); } @@ -133,8 +131,6 @@ public class ConcurrentBag return null; } - // LOGGER.debug("{} woke up to try again", Thread.currentThread()); - final long elapsed = (System.nanoTime() - startScan); timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments } @@ -317,42 +313,22 @@ public class ConcurrentBag /** * Our private synchronizer that handles notify/wait type semantics. */ - private static final class Synchronizer extends AbstractQueuedLongSynchronizer + private final class Synchronizer extends AbstractQueuedLongSynchronizer { private static final long serialVersionUID = 104753538004341218L; @Override protected long tryAcquireShared(final long seq) { - // if (hasQueuedPredecessors()) { - // LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); - // return -1L; - //} - - // final long ret = getState() > seq ? 0L : -1L; - // LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); - final long delta = getState() - (seq + 1); - return hasQueuedPredecessors() ? -1L : delta; + return hasQueuedPredecessors() ? -1L : getState() - (seq + 1); } /** {@inheritDoc} */ @Override - protected boolean tryReleaseShared(final long updateSeq) + protected boolean tryReleaseShared(final long unreliableSequence) { - long currentSeq = getState(); - while (updateSeq > currentSeq) { - if (compareAndSetState(currentSeq, updateSeq)) { - return true; - } - - // spin - Thread.yield(); - currentSeq = getState(); - } - - // LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); - - return false; + setState(sequence.get()); + return true; } } }