|
|
|
@ -100,7 +100,6 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
|
|
|
|
|
for (int i = list.size() - 1; i >= 0; i--) {
|
|
|
|
|
final IConcurrentBagEntry bagEntry = list.remove(i).get();
|
|
|
|
|
if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
// LOGGER.debug("{} fastpath bag item", Thread.currentThread());
|
|
|
|
|
return (T) bagEntry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -118,14 +117,13 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
|
|
|
|
|
startSeq = sequence.get();
|
|
|
|
|
for (final T bagEntry : sharedList) {
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
LOGGER.debug("{} got bag item {}", Thread.currentThread(), startSeq);
|
|
|
|
|
// LOGGER.debug("{} got bag item {}", Thread.currentThread(), startSeq);
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} while (startSeq < sequence.get());
|
|
|
|
|
|
|
|
|
|
if (addItemFuture == null || addItemFuture.isDone()) {
|
|
|
|
|
// LOGGER.debug("{} requesting addBagItem()", Thread.currentThread());
|
|
|
|
|
addItemFuture = listener.addBagItem();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -133,8 +131,6 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LOGGER.debug("{} woke up to try again", Thread.currentThread());
|
|
|
|
|
|
|
|
|
|
final long elapsed = (System.nanoTime() - startScan);
|
|
|
|
|
timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments
|
|
|
|
|
}
|
|
|
|
@ -317,42 +313,22 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
|
|
|
|
|
/**
|
|
|
|
|
* Our private synchronizer that handles notify/wait type semantics.
|
|
|
|
|
*/
|
|
|
|
|
private static final class Synchronizer extends AbstractQueuedLongSynchronizer
|
|
|
|
|
private final class Synchronizer extends AbstractQueuedLongSynchronizer
|
|
|
|
|
{
|
|
|
|
|
private static final long serialVersionUID = 104753538004341218L;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected long tryAcquireShared(final long seq)
|
|
|
|
|
{
|
|
|
|
|
// if (hasQueuedPredecessors()) {
|
|
|
|
|
// LOGGER.debug("{} had {} queued predecessors ({})", Thread.currentThread(), this.getQueueLength(), seq);
|
|
|
|
|
// return -1L;
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
// final long ret = getState() > seq ? 0L : -1L;
|
|
|
|
|
// LOGGER.debug("{} tryAcquireShared({}) returned {}", Thread.currentThread(), seq, ret);
|
|
|
|
|
final long delta = getState() - (seq + 1);
|
|
|
|
|
return hasQueuedPredecessors() ? -1L : delta;
|
|
|
|
|
return hasQueuedPredecessors() ? -1L : getState() - (seq + 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
|
|
@Override
|
|
|
|
|
protected boolean tryReleaseShared(final long updateSeq)
|
|
|
|
|
protected boolean tryReleaseShared(final long unreliableSequence)
|
|
|
|
|
{
|
|
|
|
|
long currentSeq = getState();
|
|
|
|
|
while (updateSeq > currentSeq) {
|
|
|
|
|
if (compareAndSetState(currentSeq, updateSeq)) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// spin
|
|
|
|
|
Thread.yield();
|
|
|
|
|
currentSeq = getState();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq);
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
setState(sequence.get());
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|