Project restructuring experiment

pull/212/head
Brett Wooldridge 10 years ago
parent d304c511b3
commit 82f98fc57e

@ -0,0 +1,19 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>HikariCP-common</artifactId>
<packaging>jar</packaging>
<name>hikaricp-common</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-parent</artifactId>
<version>2.2.6-SNAPSHOT</version>
</parent>
</project>

@ -0,0 +1,345 @@
/*
* Copyright (C) 2013, 2014 Brett Wooldridge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zaxxer.hikari.util;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.util.ConcurrentBag.BagEntry;
/**
* This is a specialized concurrent bag that achieves superior performance
* to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a
* connection pool. It uses ThreadLocal storage when possible to avoid
* locks, but resorts to scanning a common collection if there are no
* available items in the ThreadLocal list. Not-in-use items in the
* ThreadLocal lists can be "stolen" when the borrowing thread has none
* of its own. It is a "lock-less" implementation using a specialized
* AbstractQueuedLongSynchronizer to manage cross-thread signaling.
*
* Note that items that are "borrowed" from the bag are not actually
* removed from any collection, so garbage collection will not occur
* even if the reference is abandoned. Thus care must be taken to
* "requite" borrowed objects otherwise a memory leak will result. Only
* the "remove" method can completely remove an object from the bag.
*
* @author Brett Wooldridge
*
* @param <T> the templated type to store in the bag
*/
public final class ConcurrentBag<T extends BagEntry>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
public static final int STATE_NOT_IN_USE = 0;
public static final int STATE_IN_USE = 1;
public static final int STATE_REMOVED = -1;
private static final int STATE_RESERVED = -2;
public static abstract class BagEntry
{
final AtomicInteger state = new AtomicInteger();
}
/**
* This interface is implemented by a listener to the bag. The listener
* will be informed of when the bag has become empty. The usual course
* of action by the listener in this case is to attempt to add an item
* to the bag.
*/
public static interface IBagStateListener
{
void addBagItem();
}
private final ThreadLocal<ArrayList<WeakReference<BagEntry>>> threadList;
private final CopyOnWriteArrayList<T> sharedList;
private final Synchronizer synchronizer;
private final AtomicLong sequence;
private final IBagStateListener listener;
private volatile boolean closed;
/**
* Construct a ConcurrentBag with the specified listener.
*
* @param listener the IBagStateListener to attach to this bag
*/
public ConcurrentBag(IBagStateListener listener)
{
this.sharedList = new CopyOnWriteArrayList<T>();
this.synchronizer = new Synchronizer();
this.sequence = new AtomicLong(1);
this.listener = listener;
this.threadList = new ThreadLocal<ArrayList<WeakReference<BagEntry>>>();
}
/**
* The method will borrow a BagEntry from the bag, blocking for the
* specified timeout if none are available.
*
* @param timeout how long to wait before giving up, in units of unit
* @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
@SuppressWarnings("unchecked")
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
final ArrayList<WeakReference<BagEntry>> list = threadList.get();
if (list == null) {
threadList.set(new ArrayList<WeakReference<BagEntry>>(16));
}
else {
for (int i = list.size() - 1; i >= 0; i--) {
final BagEntry bagEntry = list.remove(i).get();
if (bagEntry != null && bagEntry.state.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return (T) bagEntry;
}
}
}
// Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit.toNanos(timeout);
do {
final long startScan = System.nanoTime();
long startSeq;
do {
startSeq = sequence.longValue();
for (final T bagEntry : sharedList) {
if (bagEntry.state.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
} while (startSeq < sequence.longValue());
listener.addBagItem();
synchronizer.tryAcquireSharedNanos(startSeq, timeout);
timeout -= (System.nanoTime() - startScan);
}
while (timeout > 0L);
return null;
}
/**
* This method will return a borrowed object to the bag. Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the requited value was not borrowed from the bag
*/
public void requite(final T bagEntry)
{
if (bagEntry.state.compareAndSet(STATE_IN_USE, STATE_NOT_IN_USE)) {
final ArrayList<WeakReference<BagEntry>> list = threadList.get();
if (list != null) {
list.add(new WeakReference<BagEntry>(bagEntry));
}
synchronizer.releaseShared(sequence.incrementAndGet());
}
else {
throw new IllegalStateException("Value was returned to the bag that was not borrowed: " + bagEntry);
}
}
/**
* Add a new object to the bag for others to borrow.
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry)
{
if (closed) {
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
else {
sharedList.add(bagEntry);
synchronizer.releaseShared(sequence.incrementAndGet());
}
}
/**
* Remove a value from the bag. This method should only be called
* with objects obtained by {@link #borrow(long, TimeUnit)} or {@link #reserve(BagEntry)}.
*
* @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first
*/
public boolean remove(final T bagEntry)
{
if (!bagEntry.state.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that was not borrowed or reserved");
}
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
throw new IllegalStateException("Attempt to remove an object from the bag that does not exist");
}
return removed;
}
/**
* Close the bag to further adds.
*/
public void close()
{
closed = true;
}
/**
* This method provides a "snaphot" in time of the BagEntry
* items in the bag in the specified state. It does not "lock"
* or reserve items in any way. Call {@link #reserve(BagEntry)}
* on items in list before performing any action on them.
*
* @param state one of STATE_NOT_IN_USE or STATE_IN_USE
* @return a possibly empty list of objects having the state specified
*/
public List<T> values(final int state)
{
if (state == STATE_IN_USE || state == STATE_NOT_IN_USE) {
return sharedList.stream()
.filter(reference -> reference.state.get() == state)
.collect(Collectors.toList());
}
return new ArrayList<T>(0);
}
/**
* The method is used to make an item in the bag "unavailable" for
* borrowing. It is primarily used when wanting to operate on items
* returned by the {@link #values(int)} method. Items that are
* reserved can be removed from the bag via {@link #remove(BagEntry)}
* without the need to unreserve them. Items that are not removed
* from the bag can be make available for borrowing again by calling
* the {@link #unreserve(BagEntry)} method.
*
* @param bagEntry the item to reserve
* @return true if the item was able to be reserved, false otherwise
*/
public boolean reserve(final T bagEntry)
{
return bagEntry.state.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}
/**
* This method is used to make an item reserved via {@link #reserve(BagEntry)}
* available again for borrowing.
*
* @param bagEntry the item to unreserve
*/
public void unreserve(final T bagEntry)
{
final long checkInSeq = sequence.incrementAndGet();
if (!bagEntry.state.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
throw new IllegalStateException("Attempt to relinquish an object to the bag that was not reserved");
}
synchronizer.releaseShared(checkInSeq);
}
/**
* Get the number of threads pending (waiting) for an item from the
* bag to become available.
*
* @return the number of threads waiting for items from the bag
*/
public int getPendingQueue()
{
return synchronizer.getQueueLength();
}
/**
* Get a count of the number of items in the specified state at the time of this call.
*
* @param state the state of the items to count
* @return a count of how many items in the bag are in the specified state
*/
public int getCount(final int state)
{
return (int) sharedList.stream().filter(reference -> reference.state.get() == state).count();
}
/**
* Get the total number of items in the bag.
*
* @return the number of items in the bag
*/
public int size()
{
return sharedList.size();
}
public void dumpState()
{
for (T bagEntry : sharedList) {
switch (bagEntry.state.get()) {
case STATE_IN_USE:
LOGGER.info(bagEntry.toString() + " state IN_USE");
break;
case STATE_NOT_IN_USE:
LOGGER.info(bagEntry.toString() + " state NOT_IN_USE");
break;
case STATE_REMOVED:
LOGGER.info(bagEntry.toString() + " state REMOVED");
break;
case STATE_RESERVED:
LOGGER.info(bagEntry.toString() + " state RESERVED");
break;
}
}
}
/**
* Our private synchronizer that handles notify/wait type semantics.
*/
private static final class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L;
@Override
protected long tryAcquireShared(long seq)
{
return getState() > seq && !hasQueuedPredecessors() ? 1L : -1L;
}
/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared(long updateSeq)
{
setState(updateSeq);
return true;
}
}
}

@ -0,0 +1,301 @@
package com.zaxxer.hikari.util;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.slf4j.Logger;
public final class PoolUtilities
{
private static volatile boolean IS_JDBC40;
private static volatile boolean IS_JDBC41;
private static volatile boolean jdbc40checked;
private static volatile boolean jdbc41checked;
private static volatile boolean queryTimeoutSupported = true;
/**
* Close connection and eat any exception.
*
* @param connection the connection to close
*/
public static void quietlyCloseConnection(final Connection connection)
{
if (connection != null) {
try {
connection.close();
}
catch (SQLException e) {
return;
}
}
}
/**
* Get the elapsed time in millisecond between the specified start time and now.
*
* @param start the start time
* @return the elapsed milliseconds
*/
public static long elapsedTimeMs(final long start)
{
return System.currentTimeMillis() - start;
}
/**
* Execute the user-specified init SQL.
*
* @param connection the connection to initialize
* @param sql the SQL to execute
* @throws SQLException throws if the init SQL execution fails
*/
public static void executeSql(final Connection connection, final String sql, final boolean isAutoCommit) throws SQLException
{
if (sql != null) {
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
if (!isAutoCommit) {
connection.commit();
}
}
}
}
/**
* Sleep and transform an InterruptedException into a RuntimeException.
*
* @param millis the number of milliseconds to sleep
*/
public static void quietlySleep(final long millis)
{
try {
Thread.sleep(millis);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Create and instance of the specified class using the constructor matching the specified
* arguments.
*
* @param className the name of the classto instantiate
* @param clazz a class to cast the result as
* @param args arguments to a constructor
* @return an instance of the specified class
*/
@SuppressWarnings("unchecked")
public static <T> T createInstance(final String className, final Class<T> clazz, final Object... args)
{
if (className == null) {
return null;
}
try {
Class<?> loaded = PoolUtilities.class.getClassLoader().loadClass(className);
Class<?>[] argClasses = new Class<?>[args.length];
for (int i = 0; i < args.length; i++) {
argClasses[i] = args[i].getClass();
}
if (args.length > 0) {
Constructor<?> constructor = loaded.getConstructor(argClasses);
return (T) constructor.newInstance(args);
}
return (T) loaded.newInstance();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Create/initialize the underlying DataSource.
*
* @return the DataSource
*/
public static DataSource initializeDataSource(final String dsClassName, DataSource dataSource, final Properties dataSourceProperties, final String jdbcUrl, final String username, final String password)
{
if (dataSource == null && dsClassName != null) {
dataSource = createInstance(dsClassName, DataSource.class);
PropertyBeanSetter.setTargetFromProperties(dataSource, dataSourceProperties);
return dataSource;
}
else if (jdbcUrl != null) {
return new DriverDataSource(jdbcUrl, dataSourceProperties, username, password);
}
return dataSource;
}
/**
* Get the int value of a transaction isolation level by name.
*
* @param transactionIsolationName the name of the transaction isolation level
* @return the int value of the isolation level or -1
*/
public static int getTransactionIsolation(final String transactionIsolationName)
{
if (transactionIsolationName != null) {
try {
Field field = Connection.class.getField(transactionIsolationName);
return field.getInt(null);
}
catch (Exception e) {
throw new IllegalArgumentException("Invalid transaction isolation value: " + transactionIsolationName);
}
}
return -1;
}
/**
* Setup a connection intial state.
*
* @param connection a Connection
* @param isAutoCommit auto-commit state
* @param isReadOnly read-only state
* @param transactionIsolation transaction isolation
* @param catalog default catalog
* @throws SQLException thrown from driver
*/
public static void setupConnection(final Connection connection, final boolean isAutoCommit, final boolean isReadOnly, final int transactionIsolation, final String catalog) throws SQLException
{
connection.setAutoCommit(isAutoCommit);
connection.setReadOnly(isReadOnly);
if (transactionIsolation != connection.getTransactionIsolation()) {
connection.setTransactionIsolation(transactionIsolation);
}
if (catalog != null) {
connection.setCatalog(catalog);
}
}
/**
* Create a ThreadPoolExecutor.
*
* @param queueSize the queue size
* @param threadName the thread name
* @param threadFactory an optional ThreadFactory
* @return a ThreadPoolExecutor
*/
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
{
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
/**
* Return true if the driver appears to be JDBC 4.0 compliant.
*
* @param connection a Connection to check
* @return true if JDBC 4.1 compliance, false otherwise
* @throws SQLException re-thrown exception from Connection.getNetworkTimeout()
*/
public static boolean isJdbc40Compliant(final Connection connection) throws SQLException
{
if (!jdbc40checked) {
try {
connection.isValid(5); // This will throw AbstractMethodError or SQLException in the case of a non-JDBC 41 compliant driver
IS_JDBC40 = true;
}
catch (NoSuchMethodError | AbstractMethodError | SQLFeatureNotSupportedException e) {
IS_JDBC40 = false;
}
jdbc40checked = true;
}
return IS_JDBC40;
}
/**
* Set the query timeout, if it is supported by the driver.
*
* @param statement a statement to set the query timeout on
* @param timeoutSec the number of seconds before timeout
* @throws SQLException re-thrown exception from Statement.setQueryTimeout()
*/
public static void setQueryTimeout(final Statement statement, final int timeoutSec) throws SQLException
{
if (queryTimeoutSupported) {
try {
statement.setQueryTimeout(timeoutSec);
}
catch (NoSuchMethodError | AbstractMethodError | SQLFeatureNotSupportedException e) {
queryTimeoutSupported = false;
}
}
}
/**
* Set the network timeout, if <code>isUseNetworkTimeout</code> is <code>true</code>, and return the
* pre-existing value of the network timeout.
*
* @param executor an Executor
* @param connection the connection to set the network timeout on
* @param timeoutMs the number of milliseconds before timeout
* @param isUseNetworkTimeout true if the network timeout should be set, false otherwise
* @return the pre-existing network timeout value
* @throws SQLException thrown if the network timeout cannot be set
*/
public static int setNetworkTimeout(final Executor executor, final Connection connection, final long timeoutMs, final boolean isUseNetworkTimeout) throws SQLException
{
if ((IS_JDBC41 || !jdbc41checked) && isUseNetworkTimeout) {
try {
final int networkTimeout = connection.getNetworkTimeout();
connection.setNetworkTimeout(executor, (int) timeoutMs);
IS_JDBC41 = true;
return networkTimeout;
}
catch (SQLFeatureNotSupportedException | AbstractMethodError | NoSuchMethodError e) {
IS_JDBC41 = false;
}
finally {
jdbc41checked = true;
}
}
return 0;
}
/**
* Set the loginTimeout on the specified DataSource.
*
* @param dataSource the DataSource
* @param connectionTimeout the timeout in milliseconds
* @param logger a logger to use for a warning
*/
public static void setLoginTimeout(final DataSource dataSource, final long connectionTimeout, final Logger logger)
{
if (connectionTimeout != Integer.MAX_VALUE) {
try {
dataSource.setLoginTimeout((int) TimeUnit.MILLISECONDS.toSeconds(Math.max(1000L, connectionTimeout)));
}
catch (SQLException e) {
logger.warn("Unable to set DataSource login timeout", e);
}
}
}
}

@ -50,6 +50,7 @@
<version>2.2.6-SNAPSHOT</version>
</parent>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
@ -302,4 +303,11 @@
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>hikaricp-common</artifactId>
<version>2.2.6-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

@ -54,6 +54,7 @@
<modules>
<module>hikaricp</module>
<module>hikaricp-java6</module>
<module>hikaricp-common</module>
</modules>
<build>

Loading…
Cancel
Save