From 522717f885154660e7fc60846c22827e4b99a042 Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Wed, 12 Feb 2014 19:09:38 +0900 Subject: [PATCH] New concurrent connection container (moved away from LinkedBlockingQueue and LinkedTransferQueue). --- .../java/com/zaxxer/hikari/HikariPool.java | 41 ++-- .../hikari/SpecializedConcurrentBag.java | 178 ++++++++++++++++++ .../com/zaxxer/hikari/util/ConcurrentBag.java | 117 ------------ 3 files changed, 196 insertions(+), 140 deletions(-) create mode 100644 src/main/java/com/zaxxer/hikari/SpecializedConcurrentBag.java delete mode 100644 src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java diff --git a/src/main/java/com/zaxxer/hikari/HikariPool.java b/src/main/java/com/zaxxer/hikari/HikariPool.java index 6ca5a7b2..354db869 100644 --- a/src/main/java/com/zaxxer/hikari/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/HikariPool.java @@ -19,9 +19,9 @@ package com.zaxxer.hikari; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.List; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -49,7 +49,7 @@ public final class HikariPool implements HikariPoolMBean private final IConnectionCustomizer connectionCustomizer; private final HikariConfig configuration; - private final LinkedBlockingQueue idleConnections; + private final SpecializedConcurrentBag idleConnections; private final Timer houseKeepingTimer; @@ -79,7 +79,7 @@ public final class HikariPool implements HikariPoolMBean this.idleConnectionCount = new AtomicInteger(); this.awaitingConnection = new AtomicInteger(); this.backgroundFillQueued = new AtomicBoolean(); - this.idleConnections = new LinkedBlockingQueue(); + this.idleConnections = new SpecializedConcurrentBag(); this.jdbc4ConnectionTest = configuration.isJdbc4ConnectionTest(); this.leakDetectionThreshold = configuration.getLeakDetectionThreshold(); @@ -249,16 +249,8 @@ public final class HikariPool implements HikariPoolMBean shutdown = true; houseKeepingTimer.cancel(); - while (true) - { - IHikariConnectionProxy connection = idleConnections.poll(); - if (connection == null) - { - break; - } - closeConnection(connection); - } - + closeIdleConnections(); + HikariMBeanElf.unregisterMBeans(configuration, this); } @@ -293,13 +285,13 @@ public final class HikariPool implements HikariPoolMBean /** {@inheritDoc} */ public void closeIdleConnections() { - final int idleCount = idleConnectionCount.get(); - for (int i = 0; i < idleCount; i++) + List list = idleConnections.values(SpecializedConcurrentBag.NOT_IN_USE); + for (IHikariConnectionProxy connectionProxy : list) { - IHikariConnectionProxy connectionProxy = idleConnections.poll(); + connectionProxy = idleConnections.checkout(connectionProxy); if (connectionProxy == null) { - break; + continue; } idleConnectionCount.decrementAndGet(); @@ -417,7 +409,7 @@ public final class HikariPool implements HikariPoolMBean { idleConnectionCount.incrementAndGet(); totalConnections.incrementAndGet(); - idleConnections.add(proxyConnection); + idleConnections.offer(proxyConnection); } break; } @@ -524,6 +516,10 @@ public final class HikariPool implements HikariPoolMBean { return; } + finally + { + idleConnections.remove(connectionProxy); + } } private void logPoolState(String... prefix) @@ -549,14 +545,13 @@ public final class HikariPool implements HikariPoolMBean final long now = System.currentTimeMillis(); final long idleTimeout = configuration.getIdleTimeout(); final long maxLifetime = configuration.getMaxLifetime(); - final int idleCount = idleConnectionCount.get(); - for (int i = 0; i < idleCount; i++) + for (IHikariConnectionProxy connectionProxy : idleConnections.values(SpecializedConcurrentBag.NOT_IN_USE)) { - IHikariConnectionProxy connectionProxy = idleConnections.poll(); + connectionProxy = idleConnections.checkout(connectionProxy); if (connectionProxy == null) { - break; + continue; } idleConnectionCount.decrementAndGet(); @@ -570,7 +565,7 @@ public final class HikariPool implements HikariPoolMBean else { idleConnectionCount.incrementAndGet(); - idleConnections.add(connectionProxy); + idleConnections.checkin(connectionProxy); } } diff --git a/src/main/java/com/zaxxer/hikari/SpecializedConcurrentBag.java b/src/main/java/com/zaxxer/hikari/SpecializedConcurrentBag.java new file mode 100644 index 00000000..01890047 --- /dev/null +++ b/src/main/java/com/zaxxer/hikari/SpecializedConcurrentBag.java @@ -0,0 +1,178 @@ +package com.zaxxer.hikari; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicStampedReference; +import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; + +public class SpecializedConcurrentBag +{ + static final int NOT_IN_USE = 0; + static final int IN_USE = 1; + static final int REMOVED = -1; + + private ConcurrentHashMap> map; + private Synchronizer synchronizer; + + private ThreadLocal>> threadList = new ThreadLocal>>() + { + protected LinkedList> initialValue() + { + return new LinkedList<>(); + } + }; + + public SpecializedConcurrentBag() + { + map = new ConcurrentHashMap<>(); + synchronizer = new Synchronizer(); + } + + public T poll(long timeout, TimeUnit timeUnit) throws InterruptedException + { + // Try the thread-local list first + LinkedList> list = threadList.get(); + while (!list.isEmpty()) + { + AtomicStampedReference stampedReference = list.removeLast(); + final T reference = stampedReference.getReference(); + if (stampedReference.compareAndSet(reference, reference, NOT_IN_USE, IN_USE)) + { + return reference; + } + } + + timeout = timeUnit.toNanos(timeout); + do { + final long start = System.nanoTime(); + for (AtomicStampedReference stampedReference : map.values()) + { + final T reference = stampedReference.getReference(); + if (stampedReference.compareAndSet(reference, reference, NOT_IN_USE, IN_USE)) + { + return reference; + } + } + + synchronizer.tryAcquireSharedNanos(1, timeout); + + timeout -= (System.nanoTime() - start); + } while (timeout > 0); + + return null; + } + + public boolean offer(T value) + { + LinkedList> list = threadList.get(); + AtomicStampedReference stampedReference = map.get(value); + if (stampedReference == null) + { + stampedReference = new AtomicStampedReference(value, NOT_IN_USE); + map.put(value, stampedReference); + list.addLast(stampedReference); + } + else + { + final T reference = stampedReference.getReference(); + if (stampedReference.compareAndSet(reference, reference, IN_USE, NOT_IN_USE)) + { + list.addLast(stampedReference); + } + } + + synchronizer.releaseShared(1); + + return true; + } + + public void remove(T value) + { + AtomicStampedReference stampedReference = map.get(value); + if (stampedReference != null) + { + stampedReference.set(stampedReference.getReference(), REMOVED); + map.remove(value); + } + } + + public List values(int state) + { + ArrayList list = new ArrayList<>(map.size()); + for (AtomicStampedReference stampedReference : map.values()) + { + if (stampedReference.getStamp() == state) + { + list.add(stampedReference.getReference()); + } + } + + return list; + } + + T checkout(T value) + { + AtomicStampedReference stampedReference = map.get(value); + if (stampedReference != null && stampedReference.compareAndSet(stampedReference.getReference(), stampedReference.getReference(), NOT_IN_USE, IN_USE)) + { + return value; + } + + return null; + } + + void checkin(T value) + { + AtomicStampedReference stampedReference = map.get(value); + if (stampedReference != null) + { + final T reference = stampedReference.getReference(); + stampedReference.compareAndSet(reference, reference, IN_USE, NOT_IN_USE); + synchronizer.releaseShared(1); + } + } + + private static class Synchronizer extends AbstractQueuedLongSynchronizer + { + private static final long serialVersionUID = 104753538004341218L; + + private static ThreadLocal startTimeStamp = new ThreadLocal() { + protected Long initialValue() + { + return System.nanoTime(); + } + }; + + @Override + protected long tryAcquireShared(long arg) + { + Long waitStart = startTimeStamp.get(); + + // fairness + if (hasQueuedPredecessors()) + { + return -1; + } + + if (getState() > waitStart) + { + startTimeStamp.remove(); + return 1; + } + + return -1; + } + + /** {@inheritDoc} */ + @Override + protected boolean tryReleaseShared(long arg) + { + setState(System.nanoTime()); + + return true; + } + } +} diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java deleted file mode 100644 index f9a72b3e..00000000 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.zaxxer.hikari.util; - -import java.lang.reflect.Field; -import java.util.LinkedList; -import java.util.concurrent.locks.ReentrantLock; - -public class ConcurrentBag -{ - private static sun.misc.Unsafe unsafe = getUnsafe(); - - private LinkedList> sharedList; - - private ThreadLocal> threadList = new ThreadLocal>() { - protected java.util.LinkedList initialValue() - { - LinkedList list = new LinkedList(); - sharedList.add(list); - return list; - } - }; - - public ConcurrentBag() - { - sharedList = new LinkedList<>(); - } - - @SuppressWarnings("restriction") - private static sun.misc.Unsafe getUnsafe() - { - try - { - Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - } - catch (Exception e) - { - throw new RuntimeException("Cannot access sun.misc.Unsafe"); - } - } - - private static class SinglyLinkedList - { - private ReentrantLock putLock = new ReentrantLock(); - private ReentrantLock takeLock = new ReentrantLock(); - - Node head; - Node tail; - - void add(T value) - { - Node node = new Node(value); - final ReentrantLock putLock = this.putLock; - putLock.lock(); - try - { - if (head == null) - { - head = tail = node; - } - else - { - tail.next = node; - } - } - finally - { - putLock.unlock(); - } - } - - void remove(T value) - { - final ReentrantLock putLock = this.putLock; - final ReentrantLock takeLock = this.takeLock; - putLock.lock(); - takeLock.lock(); - try - { - Node node = head; - Node prev = null; - while (node != null) - { - if (node.value == value) - { - if (prev == null) - { - head = node; - } - else - { - prev.next = node.next; - } - break; - } - node = node.next; - } - } - finally - { - takeLock.unlock(); - putLock.unlock(); - } - } - } - - private static class Node - { - E value; - Node next; - - Node(E value) - { - this.value = value; - } - } -}