From 55abf9d780591f26537bdff2448f6e1471831711 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 13 Oct 2020 09:31:07 +0300 Subject: [PATCH] Fixed - high contention during connection acquisition from connection pool. #3111 --- .../org/redisson/pubsub/AsyncSemaphore.java | 153 +++++++----------- .../org/redisson/pubsub/PublishSubscribe.java | 93 +++++------ 2 files changed, 97 insertions(+), 149 deletions(-) diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 9a7301a6e..5eb939057 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -15,11 +15,9 @@ */ package org.redisson.pubsub; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -30,8 +28,8 @@ public class AsyncSemaphore { private static class Entry { - private Runnable runnable; - private int permits; + private final Runnable runnable; + private final int permits; Entry(Runnable runnable, int permits) { super(); @@ -47,131 +45,94 @@ public class AsyncSemaphore { return runnable; } - @Override - @SuppressWarnings("AvoidInlineConditionals") - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((runnable == null) ? 0 : runnable.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Entry other = (Entry) obj; - if (runnable == null) { - if (other.runnable != null) - return false; - } else if (!runnable.equals(other.runnable)) - return false; - return true; - } - - } - - private volatile int counter; - private final Set listeners = new LinkedHashSet(); + + private final AtomicInteger counter; + private final Queue listeners = new ConcurrentLinkedQueue<>(); + private final Set removedListeners = Collections.newSetFromMap(new ConcurrentHashMap<>()); public AsyncSemaphore(int permits) { - counter = permits; + counter = new AtomicInteger(permits); } public boolean tryAcquire(long timeoutMillis) { - final CountDownLatch latch = new CountDownLatch(1); - final Runnable listener = new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }; - acquire(listener); + CountDownLatch latch = new CountDownLatch(1); + Runnable runnable = () -> latch.countDown(); + acquire(runnable); try { - boolean res = latch.await(timeoutMillis, TimeUnit.MILLISECONDS); - if (!res) { - if (!remove(listener)) { - release(); - } + boolean r = latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + if (!r) { + remove(runnable); } - return res; + return r; } catch (InterruptedException e) { + remove(runnable); Thread.currentThread().interrupt(); - if (!remove(listener)) { - release(); - } return false; } } public int queueSize() { - synchronized (this) { - return listeners.size(); - } + return listeners.size() - removedListeners.size(); } public void removeListeners() { - synchronized (this) { - listeners.clear(); - } + listeners.clear(); + removedListeners.clear(); } public void acquire(Runnable listener) { acquire(listener, 1); } - + public void acquire(Runnable listener, int permits) { - boolean run = false; - - synchronized (this) { - if (counter < permits) { - listeners.add(new Entry(listener, permits)); + if (permits <= 0) { + throw new IllegalArgumentException("permits can't be negative"); + } + listeners.add(new Entry(listener, permits)); + tryRun(); + } + + private void tryRun() { + Entry entry; + while (true) { + entry = listeners.peek(); + if (entry == null) { return; - } else { - counter -= permits; - run = true; + } + + int value = counter.get(); + if (entry.getPermits() > value) { + return; + } + + if (listeners.peek() == entry + && counter.compareAndSet(value, value - entry.getPermits())) { + listeners.poll(); + + if (removedListeners.remove(entry.getRunnable())) { + counter.addAndGet(entry.getPermits()); + } else { + break; + } } } - - if (run) { - listener.run(); - } + + entry.runnable.run(); } - public boolean remove(Runnable listener) { - synchronized (this) { - return listeners.remove(new Entry(listener, 0)); - } + public void remove(Runnable listener) { + removedListeners.add(listener); } public int getCounter() { - return counter; + return counter.get(); } public void release() { - Entry entryToAcquire = null; - - synchronized (this) { - counter++; - Iterator iter = listeners.iterator(); - if (iter.hasNext()) { - Entry entry = iter.next(); - if (entry.getPermits() <= counter) { - iter.remove(); - entryToAcquire = entry; - } - } - } - - if (entryToAcquire != null) { - acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits()); - } + counter.incrementAndGet(); + tryRun(); } @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 1769aeaeb..02a32ef45 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -15,10 +15,6 @@ */ package org.redisson.pubsub; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; - import org.redisson.PubSubEntry; import org.redisson.api.RFuture; import org.redisson.client.BaseRedisPubSubListener; @@ -30,6 +26,9 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.TransferListener; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + /** * * @author Nikita Koksharov @@ -37,75 +36,63 @@ import org.redisson.misc.TransferListener; */ abstract class PublishSubscribe> { + private final ConcurrentMap entries = new ConcurrentHashMap<>(); private final PublishSubscribeService service; - + PublishSubscribe(PublishSubscribeService service) { super(); this.service = service; } - private final ConcurrentMap entries = new ConcurrentHashMap<>(); - public void unsubscribe(E entry, String entryName, String channelName) { AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); - semaphore.acquire(new Runnable() { - @Override - public void run() { - if (entry.release() == 0) { - // just an assertion - boolean removed = entries.remove(entryName) == entry; - if (!removed) { - throw new IllegalStateException(); - } - service.unsubscribe(new ChannelName(channelName), semaphore); - } else { - semaphore.release(); + semaphore.acquire(() -> { + if (entry.release() == 0) { + // just an assertion + boolean removed = entries.remove(entryName) == entry; + if (!removed) { + throw new IllegalStateException(); } + service.unsubscribe(new ChannelName(channelName), semaphore); + } else { + semaphore.release(); } }); } public RFuture subscribe(String entryName, String channelName) { - AtomicReference listenerHolder = new AtomicReference(); AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); - RPromise newPromise = new RedissonPromise() { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return semaphore.remove(listenerHolder.get()); + RPromise newPromise = new RedissonPromise<>(); + semaphore.acquire(() -> { + if (!newPromise.setUncancellable()) { + semaphore.release(); + return; } - }; - Runnable listener = new Runnable() { + E entry = entries.get(entryName); + if (entry != null) { + entry.acquire(); + semaphore.release(); + entry.getPromise().onComplete(new TransferListener(newPromise)); + return; + } - @Override - public void run() { - E entry = entries.get(entryName); - if (entry != null) { - entry.acquire(); - semaphore.release(); - entry.getPromise().onComplete(new TransferListener(newPromise)); - return; - } - - E value = createEntry(newPromise); - value.acquire(); - - E oldValue = entries.putIfAbsent(entryName, value); - if (oldValue != null) { - oldValue.acquire(); - semaphore.release(); - oldValue.getPromise().onComplete(new TransferListener(newPromise)); - return; - } - - RedisPubSubListener listener = createListener(channelName, value); - service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); + E value = createEntry(newPromise); + value.acquire(); + + E oldValue = entries.putIfAbsent(entryName, value); + if (oldValue != null) { + oldValue.acquire(); + semaphore.release(); + oldValue.getPromise().onComplete(new TransferListener(newPromise)); + return; } - }; - semaphore.acquire(listener); - listenerHolder.set(listener); - + + RedisPubSubListener listener = createListener(channelName, value); + service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); + }); + return newPromise; }