New concurrent connection container (moved away from LinkedBlockingQueue and LinkedTransferQueue).

pull/41/head
Brett Wooldridge 11 years ago
parent 71ef0b6b8d
commit 522717f885

@ -19,9 +19,9 @@ package com.zaxxer.hikari;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -49,7 +49,7 @@ public final class HikariPool implements HikariPoolMBean
private final IConnectionCustomizer connectionCustomizer;
private final HikariConfig configuration;
private final LinkedBlockingQueue<IHikariConnectionProxy> idleConnections;
private final SpecializedConcurrentBag<IHikariConnectionProxy> idleConnections;
private final Timer houseKeepingTimer;
@ -79,7 +79,7 @@ public final class HikariPool implements HikariPoolMBean
this.idleConnectionCount = new AtomicInteger();
this.awaitingConnection = new AtomicInteger();
this.backgroundFillQueued = new AtomicBoolean();
this.idleConnections = new LinkedBlockingQueue<IHikariConnectionProxy>();
this.idleConnections = new SpecializedConcurrentBag<IHikariConnectionProxy>();
this.jdbc4ConnectionTest = configuration.isJdbc4ConnectionTest();
this.leakDetectionThreshold = configuration.getLeakDetectionThreshold();
@ -249,15 +249,7 @@ public final class HikariPool implements HikariPoolMBean
shutdown = true;
houseKeepingTimer.cancel();
while (true)
{
IHikariConnectionProxy connection = idleConnections.poll();
if (connection == null)
{
break;
}
closeConnection(connection);
}
closeIdleConnections();
HikariMBeanElf.unregisterMBeans(configuration, this);
}
@ -293,13 +285,13 @@ public final class HikariPool implements HikariPoolMBean
/** {@inheritDoc} */
public void closeIdleConnections()
{
final int idleCount = idleConnectionCount.get();
for (int i = 0; i < idleCount; i++)
List<IHikariConnectionProxy> list = idleConnections.values(SpecializedConcurrentBag.NOT_IN_USE);
for (IHikariConnectionProxy connectionProxy : list)
{
IHikariConnectionProxy connectionProxy = idleConnections.poll();
connectionProxy = idleConnections.checkout(connectionProxy);
if (connectionProxy == null)
{
break;
continue;
}
idleConnectionCount.decrementAndGet();
@ -417,7 +409,7 @@ public final class HikariPool implements HikariPoolMBean
{
idleConnectionCount.incrementAndGet();
totalConnections.incrementAndGet();
idleConnections.add(proxyConnection);
idleConnections.offer(proxyConnection);
}
break;
}
@ -524,6 +516,10 @@ public final class HikariPool implements HikariPoolMBean
{
return;
}
finally
{
idleConnections.remove(connectionProxy);
}
}
private void logPoolState(String... prefix)
@ -549,14 +545,13 @@ public final class HikariPool implements HikariPoolMBean
final long now = System.currentTimeMillis();
final long idleTimeout = configuration.getIdleTimeout();
final long maxLifetime = configuration.getMaxLifetime();
final int idleCount = idleConnectionCount.get();
for (int i = 0; i < idleCount; i++)
for (IHikariConnectionProxy connectionProxy : idleConnections.values(SpecializedConcurrentBag.NOT_IN_USE))
{
IHikariConnectionProxy connectionProxy = idleConnections.poll();
connectionProxy = idleConnections.checkout(connectionProxy);
if (connectionProxy == null)
{
break;
continue;
}
idleConnectionCount.decrementAndGet();
@ -570,7 +565,7 @@ public final class HikariPool implements HikariPoolMBean
else
{
idleConnectionCount.incrementAndGet();
idleConnections.add(connectionProxy);
idleConnections.checkin(connectionProxy);
}
}

@ -0,0 +1,178 @@
package com.zaxxer.hikari;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
public class SpecializedConcurrentBag<T>
{
static final int NOT_IN_USE = 0;
static final int IN_USE = 1;
static final int REMOVED = -1;
private ConcurrentHashMap<T, AtomicStampedReference<T>> map;
private Synchronizer synchronizer;
private ThreadLocal<LinkedList<AtomicStampedReference<T>>> threadList = new ThreadLocal<LinkedList<AtomicStampedReference<T>>>()
{
protected LinkedList<AtomicStampedReference<T>> initialValue()
{
return new LinkedList<>();
}
};
public SpecializedConcurrentBag()
{
map = new ConcurrentHashMap<>();
synchronizer = new Synchronizer();
}
public T poll(long timeout, TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
LinkedList<AtomicStampedReference<T>> list = threadList.get();
while (!list.isEmpty())
{
AtomicStampedReference<T> stampedReference = list.removeLast();
final T reference = stampedReference.getReference();
if (stampedReference.compareAndSet(reference, reference, NOT_IN_USE, IN_USE))
{
return reference;
}
}
timeout = timeUnit.toNanos(timeout);
do {
final long start = System.nanoTime();
for (AtomicStampedReference<T> stampedReference : map.values())
{
final T reference = stampedReference.getReference();
if (stampedReference.compareAndSet(reference, reference, NOT_IN_USE, IN_USE))
{
return reference;
}
}
synchronizer.tryAcquireSharedNanos(1, timeout);
timeout -= (System.nanoTime() - start);
} while (timeout > 0);
return null;
}
public boolean offer(T value)
{
LinkedList<AtomicStampedReference<T>> list = threadList.get();
AtomicStampedReference<T> stampedReference = map.get(value);
if (stampedReference == null)
{
stampedReference = new AtomicStampedReference<T>(value, NOT_IN_USE);
map.put(value, stampedReference);
list.addLast(stampedReference);
}
else
{
final T reference = stampedReference.getReference();
if (stampedReference.compareAndSet(reference, reference, IN_USE, NOT_IN_USE))
{
list.addLast(stampedReference);
}
}
synchronizer.releaseShared(1);
return true;
}
public void remove(T value)
{
AtomicStampedReference<T> stampedReference = map.get(value);
if (stampedReference != null)
{
stampedReference.set(stampedReference.getReference(), REMOVED);
map.remove(value);
}
}
public List<T> values(int state)
{
ArrayList<T> list = new ArrayList<>(map.size());
for (AtomicStampedReference<T> stampedReference : map.values())
{
if (stampedReference.getStamp() == state)
{
list.add(stampedReference.getReference());
}
}
return list;
}
T checkout(T value)
{
AtomicStampedReference<T> stampedReference = map.get(value);
if (stampedReference != null && stampedReference.compareAndSet(stampedReference.getReference(), stampedReference.getReference(), NOT_IN_USE, IN_USE))
{
return value;
}
return null;
}
void checkin(T value)
{
AtomicStampedReference<T> stampedReference = map.get(value);
if (stampedReference != null)
{
final T reference = stampedReference.getReference();
stampedReference.compareAndSet(reference, reference, IN_USE, NOT_IN_USE);
synchronizer.releaseShared(1);
}
}
private static class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L;
private static ThreadLocal<Long> startTimeStamp = new ThreadLocal<Long>() {
protected Long initialValue()
{
return System.nanoTime();
}
};
@Override
protected long tryAcquireShared(long arg)
{
Long waitStart = startTimeStamp.get();
// fairness
if (hasQueuedPredecessors())
{
return -1;
}
if (getState() > waitStart)
{
startTimeStamp.remove();
return 1;
}
return -1;
}
/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared(long arg)
{
setState(System.nanoTime());
return true;
}
}
}

@ -1,117 +0,0 @@
package com.zaxxer.hikari.util;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentBag<T>
{
private static sun.misc.Unsafe unsafe = getUnsafe();
private LinkedList<LinkedList<T>> sharedList;
private ThreadLocal<LinkedList<T>> threadList = new ThreadLocal<LinkedList<T>>() {
protected java.util.LinkedList<T> initialValue()
{
LinkedList<T> list = new LinkedList<T>();
sharedList.add(list);
return list;
}
};
public ConcurrentBag()
{
sharedList = new LinkedList<>();
}
@SuppressWarnings("restriction")
private static sun.misc.Unsafe getUnsafe()
{
try
{
Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (sun.misc.Unsafe) f.get(null);
}
catch (Exception e)
{
throw new RuntimeException("Cannot access sun.misc.Unsafe");
}
}
private static class SinglyLinkedList<T>
{
private ReentrantLock putLock = new ReentrantLock();
private ReentrantLock takeLock = new ReentrantLock();
Node<T> head;
Node<T> tail;
void add(T value)
{
Node<T> node = new Node<T>(value);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try
{
if (head == null)
{
head = tail = node;
}
else
{
tail.next = node;
}
}
finally
{
putLock.unlock();
}
}
void remove(T value)
{
final ReentrantLock putLock = this.putLock;
final ReentrantLock takeLock = this.takeLock;
putLock.lock();
takeLock.lock();
try
{
Node<T> node = head;
Node<T> prev = null;
while (node != null)
{
if (node.value == value)
{
if (prev == null)
{
head = node;
}
else
{
prev.next = node.next;
}
break;
}
node = node.next;
}
}
finally
{
takeLock.unlock();
putLock.unlock();
}
}
}
private static class Node<E>
{
E value;
Node<E> next;
Node(E value)
{
this.value = value;
}
}
}
Loading…
Cancel
Save