Improvements. Performance still off of where it needs to be in unconstrained pools.

pull/316/merge
Brett Wooldridge 10 years ago
parent 72b92390bf
commit b24cc00f05

@ -31,6 +31,8 @@ import java.sql.SQLException;
import java.sql.SQLTimeoutException; import java.sql.SQLTimeoutException;
import java.sql.Statement; import java.sql.Statement;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -347,23 +349,24 @@ public class HikariPool implements HikariPoolMBean, IBagStateListener
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public void addBagItem() public Future<Boolean> addBagItem()
{
class AddConnection implements Runnable
{ {
FutureTask<Boolean> future = new FutureTask<>(new Runnable() {
public void run() public void run()
{ {
long sleepBackoff = 200L; long sleepBackoff = 200L;
final int minimumIdle = configuration.getMinimumIdle();
final int maxPoolSize = configuration.getMaximumPoolSize(); final int maxPoolSize = configuration.getMaximumPoolSize();
while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && !addConnection()) { while (poolState == POOL_NORMAL && totalConnections.get() < maxPoolSize && getIdleConnections() <= minimumIdle && !addConnection()) {
// If we got into the loop, addConnection() failed, so we sleep and retry // If we got into the loop, addConnection() failed, so we sleep and retry
quietlySleep(sleepBackoff); quietlySleep(sleepBackoff);
sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5)); sleepBackoff = Math.min(connectionTimeout / 2, (long) ((double) sleepBackoff * 1.5));
} }
} }
} }, true);
addConnectionExecutor.execute(new AddConnection()); addConnectionExecutor.execute(future);
return future;
} }
// *********************************************************************** // ***********************************************************************

@ -17,6 +17,7 @@ package com.zaxxer.hikari.pool;
import java.sql.Connection; import java.sql.Connection;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -44,7 +45,8 @@ public final class PoolBagEntry implements IConcurrentBagEntry
this.connection = connection; this.connection = connection;
this.lastAccess = System.currentTimeMillis(); this.lastAccess = System.currentTimeMillis();
final long maxLifetime = pool.configuration.getMaxLifetime(); final long variance = pool.configuration.getMaxLifetime() > 300_000 ? ThreadLocalRandom.current().nextLong(100_000) : 0;
final long maxLifetime = pool.configuration.getMaxLifetime() - variance;
if (maxLifetime > 0) { if (maxLifetime > 0) {
endOfLife = pool.houseKeepingExecutorService.schedule(new Runnable() { endOfLife = pool.houseKeepingExecutorService.schedule(new Runnable() {
public void run() public void run()

@ -24,6 +24,7 @@ import java.lang.ref.WeakReference;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
@ -99,7 +100,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
for (int i = list.size() - 1; i >= 0; i--) { for (int i = list.size() - 1; i >= 0; i--) {
final IConcurrentBagEntry bagEntry = list.remove(i).get(); final IConcurrentBagEntry bagEntry = list.remove(i).get();
if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
LOGGER.debug("{} fastpath bag item", Thread.currentThread()); // LOGGER.debug("{} fastpath bag item", Thread.currentThread());
return (T) bagEntry; return (T) bagEntry;
} }
} }
@ -108,9 +109,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
// Otherwise, scan the shared list ... for maximum of timeout // Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit.toNanos(timeout); timeout = timeUnit.toNanos(timeout);
Future<Boolean> addItemFuture = null;
final long startScan = System.nanoTime(); final long startScan = System.nanoTime();
final long originTimeout = timeout; final long originTimeout = timeout;
final long claimedSeq = sequence.get();
do { do {
long startSeq; long startSeq;
do { do {
@ -123,14 +124,16 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
} }
} while (startSeq < sequence.get()); } while (startSeq < sequence.get());
LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); if (addItemFuture == null || addItemFuture.isDone()) {
listener.addBagItem(); // LOGGER.debug("{} requesting addBagItem()", Thread.currentThread());
addItemFuture = listener.addBagItem();
}
if (!synchronizer.tryAcquireSharedNanos(claimedSeq, timeout)) { if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) {
return null; return null;
} }
LOGGER.debug("{} woke up to try again", Thread.currentThread()); // LOGGER.debug("{} woke up to try again", Thread.currentThread());
final long elapsed = (System.nanoTime() - startScan); final long elapsed = (System.nanoTime() - startScan);
timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments
@ -321,14 +324,14 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
@Override @Override
protected long tryAcquireShared(final long seq) protected long tryAcquireShared(final long seq)
{ {
if (hasQueuedPredecessors()) { // if (hasQueuedPredecessors()) {
LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq); // LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq);
return -1L; // return -1L;
} //}
final long ret = getState() > seq ? 0L : -1L; // final long ret = getState() > seq ? 0L : -1L;
LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret); // LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret);
return ret; return getState() <= seq || hasQueuedPredecessors() ? -1L : 0L;
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -341,17 +344,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
currentSeq = getState(); currentSeq = getState();
} }
LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); // LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq);
return true; return true;
// final long currentSeq = getState();
// if (updateSeq > currentSeq && compareAndSetState(currentSeq, updateSeq)) {
// LOGGER.debug("tryReleaseShared({}) returned 'true'", updateSeq);
// return true;
// }
//
// LOGGER.debug("tryReleaseShared({}) returned 'false'", updateSeq);
// return false;
} }
} }
} }

@ -15,6 +15,8 @@
*/ */
package com.zaxxer.hikari.util; package com.zaxxer.hikari.util;
import java.util.concurrent.Future;
/** /**
* This interface is implemented by a listener to the ConcurrentBag. The * This interface is implemented by a listener to the ConcurrentBag. The
* listener will be informed of when the bag has become empty. The usual * listener will be informed of when the bag has become empty. The usual
@ -25,5 +27,5 @@ package com.zaxxer.hikari.util;
*/ */
public interface IBagStateListener public interface IBagStateListener
{ {
void addBagItem(); Future<Boolean> addBagItem();
} }

@ -158,10 +158,10 @@ public class PostgresTest
{ {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
config.setMinimumIdle(0); config.setMinimumIdle(0);
config.setMaximumPoolSize(15); config.setMaximumPoolSize(50);
config.setConnectionTimeout(10000); config.setConnectionTimeout(10000);
config.setIdleTimeout(TimeUnit.MINUTES.toMillis(2)); config.setIdleTimeout(TimeUnit.MINUTES.toMillis(1));
config.setMaxLifetime(TimeUnit.MINUTES.toMillis(6)); config.setMaxLifetime(TimeUnit.MINUTES.toMillis(2));
config.setRegisterMbeans(true); config.setRegisterMbeans(true);
config.setJdbcUrl("jdbc:postgresql://localhost:5432/netld"); config.setJdbcUrl("jdbc:postgresql://localhost:5432/netld");
@ -178,16 +178,16 @@ public class PostgresTest
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
do { do {
try (Connection conn = ds.getConnection(); Statement stmt = conn.createStatement()) { try (Connection conn = ds.getConnection(); Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery("SELECT * FROM device")) { final double sleep = Math.random() * 1.0;
try (ResultSet rs = stmt.executeQuery("SELECT pg_sleep(" + sleep + ")")) {
rs.next(); rs.next();
} }
UtilityElf.quietlySleep(Math.max(200L, (long)(Math.random() * 2500L)));
} }
catch (SQLException e) { catch (SQLException e) {
throw new RuntimeException(e); e.printStackTrace();
} }
// UtilityElf.quietlySleep((long)(Math.random() * 500L));
} while (UtilityElf.elapsedTimeMs(start) < TimeUnit.MINUTES.toMillis(42)); } while (UtilityElf.elapsedTimeMs(start) < TimeUnit.MINUTES.toMillis(4));
}; };
}); });
} }

Loading…
Cancel
Save