Experimental concurrentbag fairness changes. Starvation was observed under high load.

pull/316/merge
Brett Wooldridge 10 years ago
parent 7f50428eff
commit 72b92390bf

@ -89,16 +89,19 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{ {
// Try the thread-local list first // Try the thread-local list first if nobody is queued
final ArrayList<WeakReference<IConcurrentBagEntry>> list = threadList.get(); if (!synchronizer.hasQueuedThreads()) {
if (list == null) { final ArrayList<WeakReference<IConcurrentBagEntry>> list = threadList.get();
threadList.set(new ArrayList<WeakReference<IConcurrentBagEntry>>(16)); if (list == null) {
} threadList.set(new ArrayList<WeakReference<IConcurrentBagEntry>>(16));
else { }
for (int i = list.size() - 1; i >= 0; i--) { else {
final IConcurrentBagEntry bagEntry = list.remove(i).get(); for (int i = list.size() - 1; i >= 0; i--) {
if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { final IConcurrentBagEntry bagEntry = list.remove(i).get();
return (T) bagEntry; if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
LOGGER.debug("{} fastpath bag item", Thread.currentThread());
return (T) bagEntry;
}
} }
} }
} }
@ -107,6 +110,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
timeout = timeUnit.toNanos(timeout); timeout = timeUnit.toNanos(timeout);
final long startScan = System.nanoTime(); final long startScan = System.nanoTime();
final long originTimeout = timeout; final long originTimeout = timeout;
final long claimedSeq = sequence.get();
do { do {
long startSeq; long startSeq;
do { do {
@ -122,7 +126,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
LOGGER.debug("{} requesting addBagItem()", Thread.currentThread()); LOGGER.debug("{} requesting addBagItem()", Thread.currentThread());
listener.addBagItem(); listener.addBagItem();
if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) { if (!synchronizer.tryAcquireSharedNanos(claimedSeq, timeout)) {
return null; return null;
} }
@ -334,6 +338,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry>
long currentSeq = getState(); long currentSeq = getState();
while (updateSeq > currentSeq && !compareAndSetState(currentSeq, updateSeq)) { while (updateSeq > currentSeq && !compareAndSetState(currentSeq, updateSeq)) {
// spin // spin
currentSeq = getState();
} }
LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq); LOGGER.debug("tryReleaseShared({}) succeeded", updateSeq);

Loading…
Cancel
Save