diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 5eb939057..499ef4a08 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -88,40 +88,46 @@ public class AsyncSemaphore { public void acquire(Runnable listener, int permits) { if (permits <= 0) { - throw new IllegalArgumentException("permits can't be negative"); + throw new IllegalArgumentException("permits should be non-zero"); } listeners.add(new Entry(listener, permits)); - tryRun(); + tryRun(1); } - private void tryRun() { - Entry entry; - while (true) { - entry = listeners.peek(); - if (entry == null) { + private void tryRun(int permits) { + if (counter.get() == 0 + || listeners.peek() == null) { + return; + } + + if (counter.addAndGet(-permits) >= 0) { + Entry e = listeners.peek(); + if (e == null) { + counter.addAndGet(permits); return; } - - int value = counter.get(); - if (entry.getPermits() > value) { + if (e.getPermits() != permits) { + counter.addAndGet(permits); + tryRun(e.getPermits()); + return; + } + Entry entry = listeners.poll(); + if (entry == null) { + counter.addAndGet(permits); return; } - if (listeners.peek() == entry - && counter.compareAndSet(value, value - entry.getPermits())) { - listeners.poll(); - - if (removedListeners.remove(entry.getRunnable())) { - counter.addAndGet(entry.getPermits()); - } else { - break; - } + if (removedListeners.remove(entry.getRunnable())) { + counter.addAndGet(entry.getPermits()); + tryRun(1); + } else { + entry.runnable.run(); } + } else { + counter.addAndGet(permits); } - - entry.runnable.run(); } - + public void remove(Runnable listener) { removedListeners.add(listener); } @@ -129,10 +135,10 @@ public class AsyncSemaphore { public int getCounter() { return counter.get(); } - + public void release() { counter.incrementAndGet(); - tryRun(); + tryRun(1); } @Override