From b81c24d43c493dcae89cc7b5b24a86b5a433d12d Mon Sep 17 00:00:00 2001 From: Brett Wooldridge Date: Tue, 18 Feb 2014 10:19:35 +0900 Subject: [PATCH] Javadoc, renaming, and cleanup. --- .../java/com/zaxxer/hikari/HikariPool.java | 29 +- .../zaxxer/hikari/proxy/ConnectionProxy.java | 7 - .../hikari/proxy/IHikariConnectionProxy.java | 2 +- .../com/zaxxer/hikari/util/ConcurrentBag.java | 253 ++++++++++++++++++ .../com/zaxxer/hikari/util/IBagManagable.java | 34 --- .../hikari/util/SpecializedConcurrentBag.java | 187 ------------- 6 files changed, 266 insertions(+), 246 deletions(-) create mode 100644 src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java delete mode 100644 src/main/java/com/zaxxer/hikari/util/IBagManagable.java delete mode 100644 src/main/java/com/zaxxer/hikari/util/SpecializedConcurrentBag.java diff --git a/src/main/java/com/zaxxer/hikari/HikariPool.java b/src/main/java/com/zaxxer/hikari/HikariPool.java index e7d12689..a71a8e12 100644 --- a/src/main/java/com/zaxxer/hikari/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/HikariPool.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; import com.zaxxer.hikari.proxy.IHikariConnectionProxy; import com.zaxxer.hikari.proxy.ProxyFactory; import com.zaxxer.hikari.util.PropertyBeanSetter; -import com.zaxxer.hikari.util.SpecializedConcurrentBag; +import com.zaxxer.hikari.util.ConcurrentBag; /** * This is the primary connection pool class that provides the basic @@ -50,7 +50,7 @@ public final class HikariPool implements HikariPoolMBean private final IConnectionCustomizer connectionCustomizer; private final HikariConfig configuration; - private final SpecializedConcurrentBag idleConnections; + private final ConcurrentBag idleConnectionBag; private final Timer houseKeepingTimer; @@ -80,7 +80,7 @@ public final class HikariPool implements HikariPoolMBean this.idleConnectionCount = new AtomicInteger(); this.awaitingConnection = new AtomicInteger(); this.backgroundFillQueued = new AtomicBoolean(); - this.idleConnections = new SpecializedConcurrentBag(configuration.getMaximumPoolSize()); + this.idleConnectionBag = new ConcurrentBag(); this.jdbc4ConnectionTest = configuration.isJdbc4ConnectionTest(); this.leakDetectionThreshold = configuration.getLeakDetectionThreshold(); @@ -168,7 +168,7 @@ public final class HikariPool implements HikariPoolMBean { addConnections(AddConnectionStrategy.ONLY_IF_EMPTY); - IHikariConnectionProxy connectionProxy = idleConnections.poll(timeout, TimeUnit.MILLISECONDS); + IHikariConnectionProxy connectionProxy = idleConnectionBag.borrow(timeout, TimeUnit.MILLISECONDS); if (connectionProxy == null) { // We timed out... break and throw exception @@ -232,10 +232,7 @@ public final class HikariPool implements HikariPoolMBean if (!connectionProxy.isBrokenConnection() && !shutdown) { idleConnectionCount.incrementAndGet(); - if (!idleConnections.offer(connectionProxy)) - { - closeConnection(connectionProxy); - } + idleConnectionBag.requite(connectionProxy); } else { @@ -287,11 +284,10 @@ public final class HikariPool implements HikariPoolMBean /** {@inheritDoc} */ public void closeIdleConnections() { - List list = idleConnections.values(IHikariConnectionProxy.NOT_IN_USE); + List list = idleConnectionBag.values(ConcurrentBag.STATE_NOT_IN_USE); for (IHikariConnectionProxy connectionProxy : list) { - connectionProxy = idleConnections.checkout(connectionProxy); - if (connectionProxy == null) + if (!idleConnectionBag.reserve(connectionProxy)) { continue; } @@ -411,7 +407,7 @@ public final class HikariPool implements HikariPoolMBean { idleConnectionCount.incrementAndGet(); totalConnections.incrementAndGet(); - idleConnections.add(proxyConnection); + idleConnectionBag.add(proxyConnection); } break; } @@ -520,7 +516,7 @@ public final class HikariPool implements HikariPoolMBean } finally { - idleConnections.remove(connectionProxy); + idleConnectionBag.remove(connectionProxy); } } @@ -548,10 +544,9 @@ public final class HikariPool implements HikariPoolMBean final long idleTimeout = configuration.getIdleTimeout(); final long maxLifetime = configuration.getMaxLifetime(); - for (IHikariConnectionProxy connectionProxy : idleConnections.values(IHikariConnectionProxy.NOT_IN_USE)) + for (IHikariConnectionProxy connectionProxy : idleConnectionBag.values(ConcurrentBag.STATE_NOT_IN_USE)) { - connectionProxy = idleConnections.checkout(connectionProxy); - if (connectionProxy == null) + if (!idleConnectionBag.reserve(connectionProxy)) { continue; } @@ -567,7 +562,7 @@ public final class HikariPool implements HikariPoolMBean else { idleConnectionCount.incrementAndGet(); - idleConnections.checkin(connectionProxy); + idleConnectionBag.unreserve(connectionProxy); } } diff --git a/src/main/java/com/zaxxer/hikari/proxy/ConnectionProxy.java b/src/main/java/com/zaxxer/hikari/proxy/ConnectionProxy.java index 68726daa..c14c40a6 100644 --- a/src/main/java/com/zaxxer/hikari/proxy/ConnectionProxy.java +++ b/src/main/java/com/zaxxer/hikari/proxy/ConnectionProxy.java @@ -192,13 +192,6 @@ public abstract class ConnectionProxy implements IHikariConnectionProxy return state.get(); } - /** {@inheritDoc} */ - @Override - public void setState(int newState) - { - state.set(newState); - } - /** {@inheritDoc} */ @Override public boolean compareAndSetState(int expectedState, int newState) diff --git a/src/main/java/com/zaxxer/hikari/proxy/IHikariConnectionProxy.java b/src/main/java/com/zaxxer/hikari/proxy/IHikariConnectionProxy.java index a7660474..58a7c571 100644 --- a/src/main/java/com/zaxxer/hikari/proxy/IHikariConnectionProxy.java +++ b/src/main/java/com/zaxxer/hikari/proxy/IHikariConnectionProxy.java @@ -20,7 +20,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Timer; -import com.zaxxer.hikari.util.IBagManagable; +import com.zaxxer.hikari.util.ConcurrentBag.IBagManagable; /** * diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java new file mode 100644 index 00000000..ee43276e --- /dev/null +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -0,0 +1,253 @@ +/* + * Copyright (C) 2013 Brett Wooldridge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.zaxxer.hikari.util; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; + +/** + * This is a specialized concurrent bag that achieves superior performance + * to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a + * connection pool. It uses ThreadLocal storage when possible to avoid + * locks, but resorts to scanning a common collection if there are no + * available connections in the ThreadLocal list. Idle connections in + * ThreadLocal lists can be "stolen" when the poll()ing thread has none + * of its own. It is a "lock-less" implementation using a specialized + * AbstractQueuedLongSynchronizer to manage cross-thread signaling. + * + * Note that objects that are "borrowed" from the bag are not actually + * removed from any collection, so garbage collection will not occur + * even if the reference is abandoned. Thus care must be taken to + * "requite" borrowed objects otherwise a memory leak will result. Only + * the "remove" method can completely remove an object from the bag. + * + * @author Brett Wooldridge + * + * @param the templated type to store in the bag + * @param + */ +public class ConcurrentBag +{ + public static final int STATE_NOT_IN_USE = 0; + public static final int STATE_IN_USE = 1; + private static final int STATE_REMOVED = -1; + private static final int STATE_RESERVED = -2; + + /** + * This interface must be implemented by classes wishing to be managed by + * ConcurrentBag. All implementations must be atomic with respect to state. + * The suggested implementation is via AtomicInteger using the methods + * get() and compareAndSet(). + */ + public interface IBagManagable + { + int getState(); + + boolean compareAndSetState(int expectedState, int newState); + } + + private ThreadLocal> threadList; + private CopyOnWriteArraySet sharedList; + private Synchronizer synchronizer; + + /** + * Constructor. + */ + public ConcurrentBag() + { + this.sharedList = new CopyOnWriteArraySet<>(); + this.synchronizer = new Synchronizer(); + this.threadList = new ThreadLocal>() { + protected LinkedList initialValue() + { + return new LinkedList<>(); + } + }; + } + + /** + * The method will borrow an IBagManagable from the bag, blocking for the + * specified timeout if none are available. + * + * @param timeout how long to wait before giving up, in units of unit + * @param timeUnit a TimeUnit determining how to interpret the timeout parameter + * @return a borrowed instance from the bag or null if a timeout occurs + * @throws InterruptedException if interrupted while waiting + */ + public T borrow(long timeout, TimeUnit timeUnit) throws InterruptedException + { + // Try the thread-local list first + final LinkedList list = threadList.get(); + while (!list.isEmpty()) + { + final T reference = list.removeFirst(); + if (reference.compareAndSetState(STATE_NOT_IN_USE, STATE_IN_USE)) + { + return reference; + } + } + + // Otherwise, scan the shared list ... for maximum of timeout + timeout = timeUnit.toNanos(timeout); + do { + final long startScan = System.nanoTime(); + for (T reference : sharedList) + { + if (reference.compareAndSetState(STATE_NOT_IN_USE, STATE_IN_USE)) + { + return reference; + } + } + + synchronizer.tryAcquireSharedNanos(startScan, timeout); + + timeout -= (System.nanoTime() - startScan); + } while (timeout > 0); + + return null; + } + + /** + * This method will return a borrowed object to the bag. Objects + * that are borrowed from the bag but never "requited" will result + * in a memory leak. + * + * @param value the value to return to the bag + * @throws NullPointerException if value is null + * @throws IllegalStateException if the requited value was not borrowed from the bag + */ + public void requite(T value) + { + if (value == null) + { + throw new NullPointerException("Cannot return a null value to the bag"); + } + + if (value.compareAndSetState(STATE_IN_USE, STATE_NOT_IN_USE)) + { + final long returnTime = System.nanoTime(); + threadList.get().addLast(value); + synchronizer.releaseShared(returnTime); + } + else + { + throw new IllegalStateException("Value was returned to the bag that was not borrowed"); + } + } + + /** + * Add a new object to the bag for others to borrow. + * + * @param value an object to add to the bag + */ + public void add(T value) + { + sharedList.add(value); + synchronizer.releaseShared(1); + } + + /** + * Remove a value from the bag. This method should only be called + * with objects obtained by borrow() or reserve(). + * @param value the value to remove + * @throws IllegalStateException if an attempt is made to remove an object + * from the bag that was not borrowed or reserved first + */ + public void remove(T value) + { + if (value.compareAndSetState(STATE_IN_USE, STATE_REMOVED) || value.compareAndSetState(STATE_RESERVED, STATE_REMOVED)) + { + sharedList.remove(value); + } + else + { + throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved"); + } + } + + /** + * This method provides a "snaphot" in time of the IBagManagable + * items in the bag in the specified state. It does not "lock" + * or reserve items in any way. + * + * @param state one of STATE_NOT_IN_USE or STATE_IN_USE + * @return a possibly empty list of objects having the state specified + */ + public List values(int state) + { + ArrayList list = new ArrayList<>(sharedList.size()); + if (state == STATE_IN_USE || state == STATE_NOT_IN_USE) + { + for (T reference : sharedList) + { + if (reference.getState() == state) + { + list.add(reference); + } + } + } + return list; + } + + public boolean reserve(T value) + { + return value.compareAndSetState(STATE_NOT_IN_USE, STATE_RESERVED); + } + + public void unreserve(T value) + { + final long checkInTime = System.nanoTime(); + if (!value.compareAndSetState(STATE_RESERVED, STATE_NOT_IN_USE)) + { + throw new IllegalStateException("Attempt to relinquish an object to the bag that was not reserved"); + } + + synchronizer.releaseShared(checkInTime); + } + + /** + * Our private synchronizer that handles notify/wait type semantics. + */ + private static class Synchronizer extends AbstractQueuedLongSynchronizer + { + private static final long serialVersionUID = 104753538004341218L; + + @Override + protected long tryAcquireShared(long startScanTime) + { + // fairness + if (hasQueuedPredecessors()) + { + return -1; + } + + return getState() > startScanTime ? 1 : -1; + } + + /** {@inheritDoc} */ + @Override + protected boolean tryReleaseShared(long updateTime) + { + setState(updateTime); + + return true; + } + } +} diff --git a/src/main/java/com/zaxxer/hikari/util/IBagManagable.java b/src/main/java/com/zaxxer/hikari/util/IBagManagable.java deleted file mode 100644 index 6a0c91f7..00000000 --- a/src/main/java/com/zaxxer/hikari/util/IBagManagable.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) 2013 Brett Wooldridge - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.zaxxer.hikari.util; - -/** - * - * @author Brett Wooldridge - */ -public interface IBagManagable -{ - int NOT_IN_USE = 0; - int IN_USE = 1; - int REMOVED = -1; - - void setState(int newState); - - int getState(); - - boolean compareAndSetState(int expectedState, int newState); -} diff --git a/src/main/java/com/zaxxer/hikari/util/SpecializedConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/SpecializedConcurrentBag.java deleted file mode 100644 index eebc83b9..00000000 --- a/src/main/java/com/zaxxer/hikari/util/SpecializedConcurrentBag.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (C) 2013 Brett Wooldridge - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.zaxxer.hikari.util; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; - -import static com.zaxxer.hikari.util.IBagManagable.NOT_IN_USE; -import static com.zaxxer.hikari.util.IBagManagable.IN_USE; -import static com.zaxxer.hikari.util.IBagManagable.REMOVED; - -/** - * This is a specialized concurrent bag that achieves superior performance - * to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a - * connection pool. It uses ThreadLocal storage when possible to avoid - * locks, but resorts to scanning a common collection if there are no - * available connections in the ThreadLocal list. Idle connections in - * ThreadLocal lists can be "stolen" when the poll()ing thread has none - * of its own. It is a "lock-less" implementation using a specialized - * AbstractQueuedLongSynchronizer to manage cross-thread signaling. - * - * @author Brett Wooldridge - * - * @param the templated type to store in the bag - */ -public class SpecializedConcurrentBag -{ - private ThreadLocal> threadList; - private CopyOnWriteArraySet sharedList; - private Synchronizer synchronizer; - - /** - * Constructor. - * - * @param initialCapacity initial bag capacity - */ - public SpecializedConcurrentBag(int initialCapacity) - { - this.sharedList = new CopyOnWriteArraySet<>(); - this.synchronizer = new Synchronizer(); - this.threadList = new ThreadLocal>() { - protected LinkedList initialValue() - { - return new LinkedList<>(); - } - }; - } - - public T poll(long timeout, TimeUnit timeUnit) throws InterruptedException - { - // Try the thread-local list first - final LinkedList list = threadList.get(); - while (!list.isEmpty()) - { - final T reference = list.removeFirst(); - if (reference.compareAndSetState(NOT_IN_USE, IN_USE)) - { - return reference; - } - } - - // Otherwise, scan the shared list ... for maximum of timeout - timeout = timeUnit.toNanos(timeout); - do { - final long startScan = System.nanoTime(); - for (T reference : sharedList) - { - if (reference.compareAndSetState(NOT_IN_USE, IN_USE)) - { - return reference; - } - } - - synchronizer.tryAcquireSharedNanos(startScan, timeout); - - timeout -= (System.nanoTime() - startScan); - } while (timeout > 0); - - return null; - } - - public void add(T value) - { - sharedList.add(value); - synchronizer.releaseShared(1); - } - - public boolean offer(T value) - { - final long offerTime = System.nanoTime(); - if (value.compareAndSetState(IN_USE, NOT_IN_USE)) - { - threadList.get().addLast(value); - } - else - { - return false; - } - - synchronizer.releaseShared(offerTime); - - return true; - } - - public void remove(T value) - { - value.setState(REMOVED); - sharedList.remove(value); - } - - public List values(int state) - { - ArrayList list = new ArrayList<>(sharedList.size()); - for (T reference : sharedList) - { - if (reference.getState() == state) - { - list.add(reference); - } - } - - return list; - } - - public T checkout(T value) - { - if (value.compareAndSetState(NOT_IN_USE, IN_USE)) - { - return value; - } - - return null; - } - - public void checkin(T value) - { - final long checkInTime = System.nanoTime(); - value.compareAndSetState(IN_USE, NOT_IN_USE); - synchronizer.releaseShared(checkInTime); - } - - /** - * Our private synchronizer that handles notify/wait type semantics. - */ - private static class Synchronizer extends AbstractQueuedLongSynchronizer - { - private static final long serialVersionUID = 104753538004341218L; - - @Override - protected long tryAcquireShared(long startScanTime) - { - // fairness - if (hasQueuedPredecessors()) - { - return -1; - } - - return getState() > startScanTime ? 1 : -1; - } - - /** {@inheritDoc} */ - @Override - protected boolean tryReleaseShared(long updateTime) - { - setState(updateTime); - - return true; - } - } -}