From b24cc00f05b0197817b3f21e967423fb3395ad3e Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Thu, 16 Apr 2015 00:50:43 +0900 Subject: [PATCH] Improvements. Performance still off of where it needs to be in unconstrained pools. --- .../com/zaxxer/hikari/pool/HikariPool.java | 15 ++++--- .../com/zaxxer/hikari/pool/PoolBagEntry.java | 4 +- .../com/zaxxer/hikari/util/ConcurrentBag.java | 41 ++++++++----------- .../zaxxer/hikari/util/IBagStateListener.java | 4 +- .../java/com/zaxxer/hikari/PostgresTest.java | 18 ++++---- 5 files changed, 42 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index b982e0c1..8787e48e 100644 --- a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -31,6 +31,8 @@ import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.sql.Statement; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -347,23 +349,24 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener /** {@inheritDoc} */ @Override - public void addBagItem() + public Future addBagItem() { - class AddConnection implements Runnable - { + FutureTask future = new FutureTask<>(new Runnable() { public void run() { long sleepBackoff = 200L; + final int minimumIdle = configuration.getMinimumIdle(); final int maxPoolSize = configuration.getMaximumPoolSize(); - while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && !addConnection()) { + while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && getIdleConnections() <= minimumIdle && !addConnection()) { // If we got into the loop, addConnection() failed, so we sleep and retry quietlySleep(sleepBackoff); sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5)); } } - } + }, true); - addConnectionExecutor.execute(new AddConnection()); + addConnectionExecutor.execute(future); + return future; } // *********************************************************************** diff --git a/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java b/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java index 40d5d3a7..fd6fb5a6 100644 --- a/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java +++ b/src/main/java/com/zaxxer/hikari/pool/PoolBagEntry.java @@ -17,6 +17,7 @@ package com.zaxxer.hikari.pool; import java.sql.Connection; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -44,7 +45,8 @@ public final class PoolBagEntry implements IConcurrentBagEntry this.connection = connection; this.lastAccess = System.currentTimeMillis(); - final long maxLifetime = pool.configuration.getMaxLifetime(); + final long variance = pool.configuration.getMaxLifetime() > 300_000 ? ThreadLocalRandom.current().nextLong(100_000) : 0; + final long maxLifetime = pool.configuration.getMaxLifetime() - variance; if (maxLifetime > 0) { endOfLife = pool.houseKeepingExecutorService.schedule(new Runnable() { public void run() diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index bc7a3136..324687c7 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -24,6 +24,7 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; @@ -99,7 +100,7 @@ public class ConcurrentBag for (int i = list.size() - 1; i >= 0; i--) { final IConcurrentBagEntry bagEntry = list.remove(i).get(); if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - LOGGER.debug("{} fastpath bag item", Thread.currentThread()); + // LOGGER.debug("{} fastpath bag item", Thread.currentThread()); return (T) bagEntry; } } @@ -108,9 +109,9 @@ public class ConcurrentBag // Otherwise, scan the shared list ... for maximum of timeout timeout = timeUnit.toNanos(timeout); + Future addItemFuture = null; final long startScan = System.nanoTime(); final long originTimeout = timeout; - final long claimedSeq = sequence.get(); do { long startSeq; do { @@ -123,14 +124,16 @@ public class ConcurrentBag } } while (startSeq < sequence.get()); - LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); - listener.addBagItem(); + if (addItemFuture == null || addItemFuture.isDone()) { + // LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); + addItemFuture = listener.addBagItem(); + } - if (!synchronizer.tryAcquireSharedNanos(claimedSeq, timeout)) { + if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { return null; } - LOGGER.debug("{} woke up to try again", Thread.currentThread()); + // LOGGER.debug("{} woke up to try again", Thread.currentThread()); final long elapsed = (System.nanoTime() - startScan); timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments @@ -321,14 +324,14 @@ public class ConcurrentBag @Override protected long tryAcquireShared(final long seq) { - if (hasQueuedPredecessors()) { - LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); - return -1L; - } - - final long ret = getState() > seq ? 0L : -1L; - LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); - return ret; + // if (hasQueuedPredecessors()) { + // LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); + // return -1L; + //} + + // final long ret = getState() > seq ? 0L : -1L; + // LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); + return getState() <= seq || hasQueuedPredecessors() ? -1L : 0L; } /** {@inheritDoc} */ @@ -341,17 +344,9 @@ public class ConcurrentBag currentSeq = getState(); } - LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); + // LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); return true; -// final long currentSeq = getState(); -// if (updateSeq > currentSeq && compareAndSetState(currentSeq, updateSeq)) { -// LOGGER.debug("tryReleaseShared({}) returned 'true'", updateSeq); -// return true; -// } -// -// LOGGER.debug("tryReleaseShared({}) returned 'false'", updateSeq); -// return false; } } } diff --git a/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java b/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java index 24faf2d3..7be62a28 100644 --- a/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java +++ b/src/main/java/com/zaxxer/hikari/util/IBagStateListener.java @@ -15,6 +15,8 @@ */ package com.zaxxer.hikari.util; +import java.util.concurrent.Future; + /** * This interface is implemented by a listener to the ConcurrentBag. The * listener will be informed of when the bag has become empty. The usual @@ -25,5 +27,5 @@ package com.zaxxer.hikari.util; */ public interface IBagStateListener { - void addBagItem(); + Future addBagItem(); } diff --git a/src/test/java/com/zaxxer/hikari/PostgresTest.java b/src/test/java/com/zaxxer/hikari/PostgresTest.java index e8338c20..9de5d4cf 100644 --- a/src/test/java/com/zaxxer/hikari/PostgresTest.java +++ b/src/test/java/com/zaxxer/hikari/PostgresTest.java @@ -153,15 +153,15 @@ public class PostgresTest } } -// @Test + // @Test public void testCase4() throws Exception { HikariConfig config = new HikariConfig(); config.setMinimumIdle(0); - config.setMaximumPoolSize(15); + config.setMaximumPoolSize(50); config.setConnectionTimeout(10000); - config.setIdleTimeout(TimeUnit.MINUTES.toMillis(2)); - config.setMaxLifetime(TimeUnit.MINUTES.toMillis(6)); + config.setIdleTimeout(TimeUnit.MINUTES.toMillis(1)); + config.setMaxLifetime(TimeUnit.MINUTES.toMillis(2)); config.setRegisterMbeans(true); config.setJdbcUrl("jdbc:postgresql://localhost:5432/netld"); @@ -178,16 +178,16 @@ public class PostgresTest final long start = System.currentTimeMillis(); do { try (Connection conn = ds.getConnection(); Statement stmt = conn.createStatement()) { - try (ResultSet rs = stmt.executeQuery("SELECT * FROM device")) { + final double sleep = Math.random() * 1.0; + try (ResultSet rs = stmt.executeQuery("SELECT pg_sleep(" + sleep + ")")) { rs.next(); } - UtilityElf.quietlySleep(Math.max(200L, (long)(Math.random() * 2500L))); } catch (SQLException e) { - throw new RuntimeException(e); + e.printStackTrace(); } - - } while (UtilityElf.elapsedTimeMs(start) < TimeUnit.MINUTES.toMillis(42)); + // UtilityElf.quietlySleep((long)(Math.random() * 500L)); + } while (UtilityElf.elapsedTimeMs(start) < TimeUnit.MINUTES.toMillis(4)); }; }); }