@ -44,323 +44,300 @@ import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
* /
* /
public class ConcurrentBag < T extends com . zaxxer . hikari . util . ConcurrentBag . IBagManagable >
public class ConcurrentBag < T extends com . zaxxer . hikari . util . ConcurrentBag . IBagManagable >
{
{
public static final int STATE_NOT_IN_USE = 0 ;
public static final int STATE_NOT_IN_USE = 0 ;
public static final int STATE_IN_USE = 1 ;
public static final int STATE_IN_USE = 1 ;
private static final int STATE_REMOVED = - 1 ;
private static final int STATE_REMOVED = - 1 ;
private static final int STATE_RESERVED = - 2 ;
private static final int STATE_RESERVED = - 2 ;
/ * *
/ * *
* This interface must be implemented by classes wishing to be managed by
* This interface must be implemented by classes wishing to be managed by
* ConcurrentBag . All implementations must be atomic with respect to state .
* ConcurrentBag . All implementations must be atomic with respect to state .
* The suggested implementation is via AtomicInteger using the methods
* The suggested implementation is via AtomicInteger using the methods
* < code > get ( ) < / code > and < code > compareAndSet ( ) < / code > .
* < code > get ( ) < / code > and < code > compareAndSet ( ) < / code > .
* /
* /
public interface IBagManagable
public interface IBagManagable
{
{
int getState ( ) ;
int getState ( ) ;
boolean compareAndSetState ( int expectedState , int newState ) ;
boolean compareAndSetState ( int expectedState , int newState ) ;
}
}
/ * *
/ * *
* This interface is implemented by a listener to the bag . The listener
* This interface is implemented by a listener to the bag . The listener
* will be informed of when the bag has become empty . The usual course
* will be informed of when the bag has become empty . The usual course
* of action by the listener in this case is to attempt to add an item
* of action by the listener in this case is to attempt to add an item
* to the bag .
* to the bag .
* /
* /
public interface IBagStateListener
public interface IBagStateListener
{
{
void addBagItem ( ) ;
void addBagItem ( ) ;
}
}
private ThreadLocal < FastList < WeakReference < T > > > threadList ;
private ThreadLocal < FastList < WeakReference < T > > > threadList ;
private CopyOnWriteArraySet < T > sharedList ;
private CopyOnWriteArraySet < T > sharedList ;
private Synchronizer synchronizer ;
private Synchronizer synchronizer ;
private IBagStateListener listener ;
private IBagStateListener listener ;
/ * *
/ * *
* Constructor .
* Constructor .
* /
* /
public ConcurrentBag ( )
public ConcurrentBag ( )
{
{
this . sharedList = new CopyOnWriteArraySet < T > ( ) ;
this . sharedList = new CopyOnWriteArraySet < T > ( ) ;
this . synchronizer = new Synchronizer ( ) ;
this . synchronizer = new Synchronizer ( ) ;
this . threadList = new ThreadLocal < FastList < WeakReference < T > > > ( ) ;
this . threadList = new ThreadLocal < FastList < WeakReference < T > > > ( ) ;
}
}
/ * *
/ * *
* The method will borrow an IBagManagable from the bag , blocking for the
* The method will borrow an IBagManagable from the bag , blocking for the
* specified timeout if none are available .
* specified timeout if none are available .
*
*
* @param timeout how long to wait before giving up , in units of unit
* @param timeout how long to wait before giving up , in units of unit
* @param timeUnit a < code > TimeUnit < / code > determining how to interpret the timeout parameter
* @param timeUnit a < code > TimeUnit < / code > determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
* @throws InterruptedException if interrupted while waiting
* /
* /
public T borrow ( long timeout , TimeUnit timeUnit ) throws InterruptedException
public T borrow ( long timeout , TimeUnit timeUnit ) throws InterruptedException
{
{
// Try the thread-local list first
// Try the thread-local list first
FastList < WeakReference < T > > list = threadList . get ( ) ;
FastList < WeakReference < T > > list = threadList . get ( ) ;
if ( list = = null )
if ( list = = null ) {
{
list = new FastList < WeakReference < T > > ( WeakReference . class ) ;
list = new FastList < WeakReference < T > > ( WeakReference . class ) ;
threadList . set ( list ) ;
threadList . set ( list ) ;
}
}
else {
else
for ( int i = list . size ( ) - 1 ; i > = 0 ; i - - ) {
{
final WeakReference < T > reference = list . removeLast ( ) ;
for ( int i = list . size ( ) - 1 ; i > = 0 ; i - - )
final T element = reference . get ( ) ;
{
if ( element ! = null & & element . compareAndSetState ( STATE_NOT_IN_USE , STATE_IN_USE ) ) {
final WeakReference < T > reference = list . removeLast ( ) ;
return element ;
final T element = reference . get ( ) ;
if ( element ! = null & & element . compareAndSetState ( STATE_NOT_IN_USE , STATE_IN_USE ) )
{
return element ;
}
}
}
// Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit . toNanos ( timeout ) ;
do {
final long startScan = System . nanoTime ( ) ;
for ( T reference : sharedList )
{
if ( reference . compareAndSetState ( STATE_NOT_IN_USE , STATE_IN_USE ) )
{
return reference ;
}
}
if ( listener ! = null )
{
listener . addBagItem ( ) ;
}
synchronizer . tryAcquireSharedNanos ( startScan , timeout ) ;
timeout - = ( System . nanoTime ( ) - startScan ) ;
} while ( timeout > 0 ) ;
return null ;
}
/ * *
* This method will return a borrowed object to the bag . Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak .
*
* @param value the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the requited value was not borrowed from the bag
* /
public void requite ( final T value )
{
if ( value = = null )
{
throw new NullPointerException ( "Cannot return a null value to the bag" ) ;
}
if ( value . compareAndSetState ( STATE_IN_USE , STATE_NOT_IN_USE ) )
{
FastList < WeakReference < T > > list = threadList . get ( ) ;
if ( list = = null )
{
list = new FastList < WeakReference < T > > ( WeakReference . class ) ;
threadList . set ( list ) ;
}
list . add ( new WeakReference < T > ( value ) ) ;
synchronizer . releaseShared ( System . nanoTime ( ) ) ;
}
else
{
throw new IllegalStateException ( "Value was returned to the bag that was not borrowed: " ) ;
}
}
/ * *
* Add a new object to the bag for others to borrow .
*
* @param value an object to add to the bag
* /
public void add ( final T value )
{
final long addTime = System . nanoTime ( ) ;
sharedList . add ( value ) ;
synchronizer . releaseShared ( addTime ) ;
}
/ * *
* Remove a value from the bag . This method should only be called
* with objects obtained by { @link # borrow ( long , TimeUnit ) } or { @link # reserve ( IBagManagable ) } .
* @param value the value to remove
* @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first
* /
public void remove ( T value )
{
if ( value . compareAndSetState ( STATE_IN_USE , STATE_REMOVED ) | | value . compareAndSetState ( STATE_RESERVED , STATE_REMOVED ) )
{
if ( ! sharedList . remove ( value ) )
{
throw new IllegalStateException ( "Attempt to remove an object from the bag that does not exist" ) ;
}
}
else
{
throw new IllegalStateException ( "Attempt to remove an object from the bag that was not borrowed or reserved" ) ;
}
}
/ * *
* This method provides a "snaphot" in time of the IBagManagable
* items in the bag in the specified state . It does not "lock"
* or reserve items in any way . Call { @link # reserve ( IBagManagable ) }
* on items in list before performing any action on them .
*
* @param state one of STATE_NOT_IN_USE or STATE_IN_USE
* @return a possibly empty list of objects having the state specified
* /
public List < T > values ( int state )
{
ArrayList < T > list = new ArrayList < T > ( sharedList . size ( ) ) ;
if ( state = = STATE_IN_USE | | state = = STATE_NOT_IN_USE )
{
for ( T reference : sharedList )
{
if ( reference . getState ( ) = = state )
{
list . add ( reference ) ;
}
}
}
return list ;
}
/ * *
* The method is used to make an item in the bag "unavailable" for
* borrowing . It is primarily used when wanting to operate on items
* returned by the { @link # values ( int ) } method . Items that are
* reserved can be removed from the bag via { @link # remove ( IBagManagable ) }
* without the need to unreserve them . Items that are not removed
* from the bag can be make available for borrowing again by calling
* the { @link # unreserve ( IBagManagable ) } method .
*
* @param value the item to reserve
* @return true if the item was able to be reserved , false otherwise
* /
public boolean reserve ( T value )
{
return value . compareAndSetState ( STATE_NOT_IN_USE , STATE_RESERVED ) ;
}
/ * *
* This method is used to make an item reserved via { @link # reserve ( IBagManagable ) }
* available again for borrowing .
*
* @param value the item to unreserve
* /
public void unreserve ( T value )
{
final long checkInTime = System . nanoTime ( ) ;
if ( ! value . compareAndSetState ( STATE_RESERVED , STATE_NOT_IN_USE ) )
{
throw new IllegalStateException ( "Attempt to relinquish an object to the bag that was not reserved" ) ;
}
synchronizer . releaseShared ( checkInTime ) ;
}
/ * *
* Add a listener to the bag . There can only be one . If this method is
* called a second time , the original listener will be evicted .
*
* @param listener a listener to the bag
* /
public void addBagStateListener ( IBagStateListener listener )
{
this . listener = listener ;
}
/ * *
* Get the number of threads pending ( waiting ) for an item from the
* bag to become available .
*
* @return the number of threads waiting for items from the bag
* /
public int getPendingQueue ( )
{
return synchronizer . getQueueLength ( ) ;
}
public int getCount ( int state )
{
int count = 0 ;
for ( T reference : sharedList )
{
if ( reference . getState ( ) = = state )
{
count + + ;
}
}
return count ;
}
/ * *
* Get the total number of items in the bag .
*
* @return the number of items in the bag
* /
public int size ( )
{
return sharedList . size ( ) ;
}
/ * *
* Our private synchronizer that handles notify / wait type semantics .
* /
private static class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L ;
private static final boolean JAVA7 ;
static
{
boolean b = false ;
try
{
b = AbstractQueuedLongSynchronizer . class . getMethod ( "hasQueuedPredecessors" , new Class < ? > [ 0 ] ) ! = null ;
}
}
catch ( Exception e )
}
{
}
// Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit . toNanos ( timeout ) ;
do {
final long startScan = System . nanoTime ( ) ;
for ( T reference : sharedList ) {
if ( reference . compareAndSetState ( STATE_NOT_IN_USE , STATE_IN_USE ) ) {
return reference ;
}
}
}
JAVA7 = b ;
}
if ( listener ! = null ) {
listener . addBagItem ( ) ;
@Override
}
protected long tryAcquireShared ( long startScanTime )
{
synchronizer . tryAcquireSharedNanos ( startScan , timeout ) ;
return getState ( ) > = startScanTime & & ! java67hasQueuedPredecessors ( ) ? 1 : - 1 ;
}
timeout - = ( System . nanoTime ( ) - startScan ) ;
}
/** {@inheritDoc} */
while ( timeout > 0 ) ;
@Override
protected boolean tryReleaseShared ( long updateTime )
return null ;
{
}
setState ( updateTime ) ;
/ * *
return true ;
* This method will return a borrowed object to the bag . Objects
}
* that are borrowed from the bag but never "requited" will result
* in a memory leak .
private boolean java67hasQueuedPredecessors ( )
*
{
* @param value the value to return to the bag
if ( JAVA7 )
* @throws NullPointerException if value is null
{
* @throws IllegalStateException if the requited value was not borrowed from the bag
return hasQueuedPredecessors ( ) ;
* /
public void requite ( final T value )
{
if ( value = = null ) {
throw new NullPointerException ( "Cannot return a null value to the bag" ) ;
}
if ( value . compareAndSetState ( STATE_IN_USE , STATE_NOT_IN_USE ) ) {
FastList < WeakReference < T > > list = threadList . get ( ) ;
if ( list = = null ) {
list = new FastList < WeakReference < T > > ( WeakReference . class ) ;
threadList . set ( list ) ;
}
list . add ( new WeakReference < T > ( value ) ) ;
synchronizer . releaseShared ( System . nanoTime ( ) ) ;
}
else {
throw new IllegalStateException ( "Value was returned to the bag that was not borrowed: " ) ;
}
}
/ * *
* Add a new object to the bag for others to borrow .
*
* @param value an object to add to the bag
* /
public void add ( final T value )
{
final long addTime = System . nanoTime ( ) ;
sharedList . add ( value ) ;
synchronizer . releaseShared ( addTime ) ;
}
/ * *
* Remove a value from the bag . This method should only be called
* with objects obtained by { @link # borrow ( long , TimeUnit ) } or { @link # reserve ( IBagManagable ) } .
* @param value the value to remove
* @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first
* /
public void remove ( T value )
{
if ( value . compareAndSetState ( STATE_IN_USE , STATE_REMOVED ) | | value . compareAndSetState ( STATE_RESERVED , STATE_REMOVED ) ) {
if ( ! sharedList . remove ( value ) ) {
throw new IllegalStateException ( "Attempt to remove an object from the bag that does not exist" ) ;
}
}
else {
throw new IllegalStateException ( "Attempt to remove an object from the bag that was not borrowed or reserved" ) ;
}
}
/ * *
* This method provides a "snaphot" in time of the IBagManagable
* items in the bag in the specified state . It does not "lock"
* or reserve items in any way . Call { @link # reserve ( IBagManagable ) }
* on items in list before performing any action on them .
*
* @param state one of STATE_NOT_IN_USE or STATE_IN_USE
* @return a possibly empty list of objects having the state specified
* /
public List < T > values ( int state )
{
ArrayList < T > list = new ArrayList < T > ( sharedList . size ( ) ) ;
if ( state = = STATE_IN_USE | | state = = STATE_NOT_IN_USE ) {
for ( T reference : sharedList ) {
if ( reference . getState ( ) = = state ) {
list . add ( reference ) ;
}
}
}
return false ;
}
}
return list ;
}
}
/ * *
* The method is used to make an item in the bag "unavailable" for
* borrowing . It is primarily used when wanting to operate on items
* returned by the { @link # values ( int ) } method . Items that are
* reserved can be removed from the bag via { @link # remove ( IBagManagable ) }
* without the need to unreserve them . Items that are not removed
* from the bag can be make available for borrowing again by calling
* the { @link # unreserve ( IBagManagable ) } method .
*
* @param value the item to reserve
* @return true if the item was able to be reserved , false otherwise
* /
public boolean reserve ( T value )
{
return value . compareAndSetState ( STATE_NOT_IN_USE , STATE_RESERVED ) ;
}
/ * *
* This method is used to make an item reserved via { @link # reserve ( IBagManagable ) }
* available again for borrowing .
*
* @param value the item to unreserve
* /
public void unreserve ( T value )
{
final long checkInTime = System . nanoTime ( ) ;
if ( ! value . compareAndSetState ( STATE_RESERVED , STATE_NOT_IN_USE ) ) {
throw new IllegalStateException ( "Attempt to relinquish an object to the bag that was not reserved" ) ;
}
synchronizer . releaseShared ( checkInTime ) ;
}
/ * *
* Add a listener to the bag . There can only be one . If this method is
* called a second time , the original listener will be evicted .
*
* @param listener a listener to the bag
* /
public void addBagStateListener ( IBagStateListener listener )
{
this . listener = listener ;
}
/ * *
* Get the number of threads pending ( waiting ) for an item from the
* bag to become available .
*
* @return the number of threads waiting for items from the bag
* /
public int getPendingQueue ( )
{
return synchronizer . getQueueLength ( ) ;
}
public int getCount ( int state )
{
int count = 0 ;
for ( T reference : sharedList ) {
if ( reference . getState ( ) = = state ) {
count + + ;
}
}
return count ;
}
/ * *
* Get the total number of items in the bag .
*
* @return the number of items in the bag
* /
public int size ( )
{
return sharedList . size ( ) ;
}
/ * *
* Our private synchronizer that handles notify / wait type semantics .
* /
private static class Synchronizer extends AbstractQueuedLongSynchronizer
{
private static final long serialVersionUID = 104753538004341218L ;
private static final boolean JAVA7 ;
static {
boolean b = false ;
try {
b = AbstractQueuedLongSynchronizer . class . getMethod ( "hasQueuedPredecessors" , new Class < ? > [ 0 ] ) ! = null ;
}
catch ( Exception e ) {
}
JAVA7 = b ;
}
@Override
protected long tryAcquireShared ( long startScanTime )
{
return getState ( ) > = startScanTime & & ! java67hasQueuedPredecessors ( ) ? 1 : - 1 ;
}
/** {@inheritDoc} */
@Override
protected boolean tryReleaseShared ( long updateTime )
{
setState ( updateTime ) ;
return true ;
}
private boolean java67hasQueuedPredecessors ( )
{
if ( JAVA7 ) {
return hasQueuedPredecessors ( ) ;
}
return false ;
}
}
}
}