|
|
|
@ -124,27 +124,22 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
final long startScan = System.nanoTime();
|
|
|
|
|
final long originTimeout = timeout;
|
|
|
|
|
long startSeq;
|
|
|
|
|
try {
|
|
|
|
|
do {
|
|
|
|
|
do {
|
|
|
|
|
do {
|
|
|
|
|
startSeq = synchronizer.currentSequence();
|
|
|
|
|
for (final T bagEntry : sharedList) {
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
startSeq = synchronizer.currentSequence();
|
|
|
|
|
for (final T bagEntry : sharedList) {
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
} while (startSeq < synchronizer.currentSequence());
|
|
|
|
|
|
|
|
|
|
if (addItemFuture == null || addItemFuture.isDone()) {
|
|
|
|
|
addItemFuture = listener.addBagItem();
|
|
|
|
|
}
|
|
|
|
|
} while (startSeq < synchronizer.currentSequence());
|
|
|
|
|
|
|
|
|
|
timeout = originTimeout - (System.nanoTime() - startScan);
|
|
|
|
|
} while (timeout > 1000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout));
|
|
|
|
|
}
|
|
|
|
|
finally {
|
|
|
|
|
synchronizer.signal();
|
|
|
|
|
}
|
|
|
|
|
if (addItemFuture == null || addItemFuture.isDone()) {
|
|
|
|
|
addItemFuture = listener.addBagItem();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
timeout = originTimeout - (System.nanoTime() - startScan);
|
|
|
|
|
} while (timeout > 1000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout));
|
|
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|