Fixed - high contention during connection acquisition from connection pool. #3111

pull/3193/head
Nikita Koksharov 4 years ago
parent 55abf9d780
commit d53f5ca747

@ -88,40 +88,46 @@ public class AsyncSemaphore {
public void acquire(Runnable listener, int permits) {
if (permits <= 0) {
throw new IllegalArgumentException("permits can't be negative");
throw new IllegalArgumentException("permits should be non-zero");
}
listeners.add(new Entry(listener, permits));
tryRun();
tryRun(1);
}
private void tryRun() {
Entry entry;
while (true) {
entry = listeners.peek();
if (entry == null) {
private void tryRun(int permits) {
if (counter.get() == 0
|| listeners.peek() == null) {
return;
}
if (counter.addAndGet(-permits) >= 0) {
Entry e = listeners.peek();
if (e == null) {
counter.addAndGet(permits);
return;
}
int value = counter.get();
if (entry.getPermits() > value) {
if (e.getPermits() != permits) {
counter.addAndGet(permits);
tryRun(e.getPermits());
return;
}
Entry entry = listeners.poll();
if (entry == null) {
counter.addAndGet(permits);
return;
}
if (listeners.peek() == entry
&& counter.compareAndSet(value, value - entry.getPermits())) {
listeners.poll();
if (removedListeners.remove(entry.getRunnable())) {
counter.addAndGet(entry.getPermits());
} else {
break;
}
if (removedListeners.remove(entry.getRunnable())) {
counter.addAndGet(entry.getPermits());
tryRun(1);
} else {
entry.runnable.run();
}
} else {
counter.addAndGet(permits);
}
entry.runnable.run();
}
public void remove(Runnable listener) {
removedListeners.add(listener);
}
@ -129,10 +135,10 @@ public class AsyncSemaphore {
public int getCounter() {
return counter.get();
}
public void release() {
counter.incrementAndGet();
tryRun();
tryRun(1);
}
@Override

Loading…
Cancel
Save