|
|
|
@ -58,7 +58,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
|
|
|
|
|
private final AbstractQueuedLongSynchronizer synchronizer;
|
|
|
|
|
private final CopyOnWriteArrayList<T> sharedList;
|
|
|
|
|
private final boolean weakThreadLocal;
|
|
|
|
|
private final boolean weakThreadLocals;
|
|
|
|
|
|
|
|
|
|
private final ThreadLocal<List> threadList;
|
|
|
|
|
private final Sequence sequence;
|
|
|
|
@ -71,15 +71,15 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
*
|
|
|
|
|
* @param listener the IBagStateListener to attach to this bag
|
|
|
|
|
*/
|
|
|
|
|
public ConcurrentBag(IBagStateListener listener, boolean weakThreadLocal)
|
|
|
|
|
public ConcurrentBag(IBagStateListener listener)
|
|
|
|
|
{
|
|
|
|
|
this.listener = listener;
|
|
|
|
|
this.weakThreadLocal = weakThreadLocal;
|
|
|
|
|
this.weakThreadLocals = useWeakThreadLocals();
|
|
|
|
|
|
|
|
|
|
this.sharedList = new CopyOnWriteArrayList<>();
|
|
|
|
|
this.synchronizer = new Synchronizer();
|
|
|
|
|
this.sequence = Sequence.Factory.create();
|
|
|
|
|
if (weakThreadLocal) {
|
|
|
|
|
if (weakThreadLocals) {
|
|
|
|
|
this.threadList = new ThreadLocal<>();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
@ -108,24 +108,14 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
// Try the thread-local list first, if there are no blocked threads waiting already
|
|
|
|
|
if (!synchronizer.hasQueuedThreads()) {
|
|
|
|
|
List<?> list = threadList.get();
|
|
|
|
|
if (weakThreadLocal && list == null) {
|
|
|
|
|
if (weakThreadLocals && list == null) {
|
|
|
|
|
list = new ArrayList<>(16);
|
|
|
|
|
threadList.set(list);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int i = list.size() - 1; i >= 0; i--) {
|
|
|
|
|
final T bagEntry;
|
|
|
|
|
if (weakThreadLocal) {
|
|
|
|
|
bagEntry = (T) ((WeakReference) list.remove(i)).get();
|
|
|
|
|
if (bagEntry == null) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
bagEntry = (T) list.remove(i);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
final T bagEntry = (T) (weakThreadLocals ? ((WeakReference) list.remove(i)).get() : list.remove(i));
|
|
|
|
|
if (bagEntry != null & bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -177,7 +167,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_IN_USE, STATE_NOT_IN_USE)) {
|
|
|
|
|
final List threadLocalList = threadList.get();
|
|
|
|
|
if (threadLocalList != null) {
|
|
|
|
|
threadLocalList.add((weakThreadLocal ? new WeakReference<>(bagEntry) : bagEntry));
|
|
|
|
|
threadLocalList.add((weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sequence.increment();
|
|
|
|
@ -338,6 +328,23 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Determine whether to use WeakReferences based on whether there is a
|
|
|
|
|
* custom ClassLoader implementation sitting between this class and the
|
|
|
|
|
* System ClassLoader.
|
|
|
|
|
*
|
|
|
|
|
* @return true if we should use WeakReferences in our ThreadLocals, false otherwise
|
|
|
|
|
*/
|
|
|
|
|
private boolean useWeakThreadLocals()
|
|
|
|
|
{
|
|
|
|
|
try {
|
|
|
|
|
return !(this.getClass().getClassLoader().toString().startsWith("sun.misc") || ClassLoader.getSystemClassLoader() == this.getClass().getClassLoader());
|
|
|
|
|
}
|
|
|
|
|
catch (SecurityException se) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Our private synchronizer that handles notify/wait type semantics.
|
|
|
|
|
*/
|
|
|
|
|