Fixed - high contention during connection acquisition from connection pool. #3111

pull/3123/head
Nikita Koksharov 4 years ago
parent 3ab7930af3
commit 55abf9d780

@ -15,11 +15,9 @@
*/ */
package org.redisson.pubsub; package org.redisson.pubsub;
import java.util.Iterator; import java.util.*;
import java.util.LinkedHashSet; import java.util.concurrent.*;
import java.util.Set; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -30,8 +28,8 @@ public class AsyncSemaphore {
private static class Entry { private static class Entry {
private Runnable runnable; private final Runnable runnable;
private int permits; private final int permits;
Entry(Runnable runnable, int permits) { Entry(Runnable runnable, int permits) {
super(); super();
@ -47,131 +45,94 @@ public class AsyncSemaphore {
return runnable; return runnable;
} }
@Override
@SuppressWarnings("AvoidInlineConditionals")
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((runnable == null) ? 0 : runnable.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Entry other = (Entry) obj;
if (runnable == null) {
if (other.runnable != null)
return false;
} else if (!runnable.equals(other.runnable))
return false;
return true;
}
} }
private volatile int counter; private final AtomicInteger counter;
private final Set<Entry> listeners = new LinkedHashSet<Entry>(); private final Queue<Entry> listeners = new ConcurrentLinkedQueue<>();
private final Set<Runnable> removedListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
public AsyncSemaphore(int permits) { public AsyncSemaphore(int permits) {
counter = permits; counter = new AtomicInteger(permits);
} }
public boolean tryAcquire(long timeoutMillis) { public boolean tryAcquire(long timeoutMillis) {
final CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
final Runnable listener = new Runnable() { Runnable runnable = () -> latch.countDown();
@Override acquire(runnable);
public void run() {
latch.countDown();
}
};
acquire(listener);
try { try {
boolean res = latch.await(timeoutMillis, TimeUnit.MILLISECONDS); boolean r = latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
if (!res) { if (!r) {
if (!remove(listener)) { remove(runnable);
release();
}
} }
return res; return r;
} catch (InterruptedException e) { } catch (InterruptedException e) {
remove(runnable);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
if (!remove(listener)) {
release();
}
return false; return false;
} }
} }
public int queueSize() { public int queueSize() {
synchronized (this) { return listeners.size() - removedListeners.size();
return listeners.size();
}
} }
public void removeListeners() { public void removeListeners() {
synchronized (this) { listeners.clear();
listeners.clear(); removedListeners.clear();
}
} }
public void acquire(Runnable listener) { public void acquire(Runnable listener) {
acquire(listener, 1); acquire(listener, 1);
} }
public void acquire(Runnable listener, int permits) { public void acquire(Runnable listener, int permits) {
boolean run = false; if (permits <= 0) {
throw new IllegalArgumentException("permits can't be negative");
synchronized (this) { }
if (counter < permits) { listeners.add(new Entry(listener, permits));
listeners.add(new Entry(listener, permits)); tryRun();
}
private void tryRun() {
Entry entry;
while (true) {
entry = listeners.peek();
if (entry == null) {
return; return;
} else { }
counter -= permits;
run = true; int value = counter.get();
if (entry.getPermits() > value) {
return;
}
if (listeners.peek() == entry
&& counter.compareAndSet(value, value - entry.getPermits())) {
listeners.poll();
if (removedListeners.remove(entry.getRunnable())) {
counter.addAndGet(entry.getPermits());
} else {
break;
}
} }
} }
if (run) { entry.runnable.run();
listener.run();
}
} }
public boolean remove(Runnable listener) { public void remove(Runnable listener) {
synchronized (this) { removedListeners.add(listener);
return listeners.remove(new Entry(listener, 0));
}
} }
public int getCounter() { public int getCounter() {
return counter; return counter.get();
} }
public void release() { public void release() {
Entry entryToAcquire = null; counter.incrementAndGet();
tryRun();
synchronized (this) {
counter++;
Iterator<Entry> iter = listeners.iterator();
if (iter.hasNext()) {
Entry entry = iter.next();
if (entry.getPermits() <= counter) {
iter.remove();
entryToAcquire = entry;
}
}
}
if (entryToAcquire != null) {
acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits());
}
} }
@Override @Override

@ -15,10 +15,6 @@
*/ */
package org.redisson.pubsub; package org.redisson.pubsub;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.PubSubEntry; import org.redisson.PubSubEntry;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
@ -30,6 +26,9 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener; import org.redisson.misc.TransferListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -37,75 +36,63 @@ import org.redisson.misc.TransferListener;
*/ */
abstract class PublishSubscribe<E extends PubSubEntry<E>> { abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
private final PublishSubscribeService service; private final PublishSubscribeService service;
PublishSubscribe(PublishSubscribeService service) { PublishSubscribe(PublishSubscribeService service) {
super(); super();
this.service = service; this.service = service;
} }
private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
public void unsubscribe(E entry, String entryName, String channelName) { public void unsubscribe(E entry, String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
semaphore.acquire(new Runnable() { semaphore.acquire(() -> {
@Override if (entry.release() == 0) {
public void run() { // just an assertion
if (entry.release() == 0) { boolean removed = entries.remove(entryName) == entry;
// just an assertion if (!removed) {
boolean removed = entries.remove(entryName) == entry; throw new IllegalStateException();
if (!removed) {
throw new IllegalStateException();
}
service.unsubscribe(new ChannelName(channelName), semaphore);
} else {
semaphore.release();
} }
service.unsubscribe(new ChannelName(channelName), semaphore);
} else {
semaphore.release();
} }
}); });
} }
public RFuture<E> subscribe(String entryName, String channelName) { public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<E>() { RPromise<E> newPromise = new RedissonPromise<>();
@Override semaphore.acquire(() -> {
public boolean cancel(boolean mayInterruptIfRunning) { if (!newPromise.setUncancellable()) {
return semaphore.remove(listenerHolder.get()); semaphore.release();
return;
} }
};
Runnable listener = new Runnable() { E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
@Override E value = createEntry(newPromise);
public void run() { value.acquire();
E entry = entries.get(entryName);
if (entry != null) { E oldValue = entries.putIfAbsent(entryName, value);
entry.acquire(); if (oldValue != null) {
semaphore.release(); oldValue.acquire();
entry.getPromise().onComplete(new TransferListener<E>(newPromise)); semaphore.release();
return; oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
} return;
E value = createEntry(newPromise);
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
} }
};
semaphore.acquire(listener); RedisPubSubListener<Object> listener = createListener(channelName, value);
listenerHolder.set(listener); service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
});
return newPromise; return newPromise;
} }

Loading…
Cancel
Save