|
|
|
@ -123,15 +123,13 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
|
|
|
|
|
{
|
|
|
|
|
// Try the thread-local list first
|
|
|
|
|
if (waiters.get() == 0) {
|
|
|
|
|
final List<Object> list = threadList.get();
|
|
|
|
|
for (int i = list.size() - 1; i >= 0; i--) {
|
|
|
|
|
final Object entry = list.remove(i);
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
|
|
|
|
|
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
final List<Object> list = threadList.get();
|
|
|
|
|
for (int i = list.size() - 1; i >= 0; i--) {
|
|
|
|
|
final Object entry = list.remove(i);
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
|
|
|
|
|
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -149,7 +147,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
listener.addBagItem(waiting);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
timeout = timeUnit.toNanos(timeout);
|
|
|
|
|
do {
|
|
|
|
|
final long start = currentTime();
|
|
|
|
@ -181,7 +179,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
{
|
|
|
|
|
bagEntry.setState(STATE_NOT_IN_USE);
|
|
|
|
|
|
|
|
|
|
while (waiters.get() > 0) {
|
|
|
|
|
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE) {
|
|
|
|
|
if (handoffQueue.offer(bagEntry)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|