|
|
|
@ -26,7 +26,7 @@ import java.util.List;
|
|
|
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
|
|
|
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
|
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
@ -60,7 +60,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
protected final CopyOnWriteArrayList<T> sharedList;
|
|
|
|
|
|
|
|
|
|
private final ThreadLocal<ArrayList<WeakReference<IConcurrentBagEntry>>> threadList;
|
|
|
|
|
private final AtomicLong sequence;
|
|
|
|
|
private final LongAdder sequence;
|
|
|
|
|
private final IBagStateListener listener;
|
|
|
|
|
private volatile boolean closed;
|
|
|
|
|
|
|
|
|
@ -73,7 +73,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
{
|
|
|
|
|
this.sharedList = new CopyOnWriteArrayList<T>();
|
|
|
|
|
this.synchronizer = new Synchronizer();
|
|
|
|
|
this.sequence = new AtomicLong(1);
|
|
|
|
|
this.sequence = new LongAdder();
|
|
|
|
|
this.listener = listener;
|
|
|
|
|
this.threadList = new ThreadLocal<ArrayList<WeakReference<IConcurrentBagEntry>>>();
|
|
|
|
|
}
|
|
|
|
@ -114,24 +114,23 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
do {
|
|
|
|
|
long startSeq;
|
|
|
|
|
do {
|
|
|
|
|
startSeq = sequence.get();
|
|
|
|
|
startSeq = sequence.sum();
|
|
|
|
|
for (final T bagEntry : sharedList) {
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
|
|
|
|
|
return bagEntry;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} while (startSeq < sequence.get());
|
|
|
|
|
} while (startSeq < sequence.sum());
|
|
|
|
|
|
|
|
|
|
if (addItemFuture == null || addItemFuture.isDone()) {
|
|
|
|
|
addItemFuture = listener.addBagItem();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!synchronizer.tryAcquireSharedNanos(startSeq, timeout)) {
|
|
|
|
|
return null;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final long elapsed = (System.nanoTime() - startScan);
|
|
|
|
|
timeout = originTimeout - Math.max(elapsed, 100L); // don't trust the nanoTime() impl. not to go backwards due to NTP adjustments
|
|
|
|
|
timeout = originTimeout - (System.nanoTime() - startScan);
|
|
|
|
|
}
|
|
|
|
|
while (timeout > 1000L); // 1000ns is the minimum resolution on many systems
|
|
|
|
|
|
|
|
|
@ -154,7 +153,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
if (list != null) {
|
|
|
|
|
list.add(new WeakReference<IConcurrentBagEntry>(bagEntry));
|
|
|
|
|
}
|
|
|
|
|
synchronizer.releaseShared(sequence.incrementAndGet());
|
|
|
|
|
|
|
|
|
|
sequence.increment();
|
|
|
|
|
synchronizer.releaseShared(1);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
|
|
|
|
@ -174,7 +175,8 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sharedList.add(bagEntry);
|
|
|
|
|
synchronizer.releaseShared(sequence.incrementAndGet());
|
|
|
|
|
sequence.increment();
|
|
|
|
|
synchronizer.releaseShared(1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -256,9 +258,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
*/
|
|
|
|
|
public void unreserve(final T bagEntry)
|
|
|
|
|
{
|
|
|
|
|
final long checkInSeq = sequence.incrementAndGet();
|
|
|
|
|
sequence.increment();
|
|
|
|
|
if (bagEntry.state().compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
|
|
|
|
|
synchronizer.releaseShared(checkInSeq);
|
|
|
|
|
synchronizer.releaseShared(1);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
|
|
|
|
@ -320,14 +322,13 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
|
|
|
|
|
@Override
|
|
|
|
|
protected long tryAcquireShared(final long seq)
|
|
|
|
|
{
|
|
|
|
|
return hasQueuedPredecessors() ? -1L : getState() - (seq + 1);
|
|
|
|
|
return hasQueuedPredecessors() ? -1L : sequence.sum() - (seq + 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
|
|
@Override
|
|
|
|
|
protected boolean tryReleaseShared(final long unreliableSequence)
|
|
|
|
|
{
|
|
|
|
|
setState(sequence.get());
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|