PubSub handling has been reimplemented. #543

pull/555/head
Nikita 9 years ago
parent 7f0cc3efa0
commit 8fd41252b9

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
@ -70,26 +71,21 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
} finally {
entry.unlock();
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
} else {
semaphore.release();
}
// listener has been re-attached
removeListener(listenerId);
}
@Override

@ -17,11 +17,13 @@ package org.redisson;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
@ -85,26 +87,21 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name);
}
return;
}
} finally {
entry.unlock();
}
// listener has been re-attached
removeListener(listenerId);
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
} else {
semaphore.release();
}
}
}

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.redisson.MasterSlaveServersConfig;
@ -45,6 +46,8 @@ public interface ConnectionManager {
boolean isClusterMode();
Semaphore getSemaphore(String channelName);
<R> Future<R> newSucceededFuture(R value);
ConnectionEventsHub getConnectionEventsHub();
@ -53,8 +56,10 @@ public interface ConnectionManager {
boolean isShuttingDown();
Promise<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener);
Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener);
Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener, Semaphore semaphore);
ConnectionInitializer getConnectListener();
IdleConnectionWatcher getConnectionWatcher();
@ -92,11 +97,17 @@ public interface ConnectionManager {
PubSubConnectionEntry getPubSubEntry(String channelName);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener, Semaphore semaphore);
Codec unsubscribe(final String channelName, Semaphore lock);
Codec unsubscribe(String channelName);
Codec punsubscribe(String channelName);
Codec punsubscribe(final String channelName, Semaphore lock);
void shutdown();
void shutdown(long quietPeriod, long timeout, TimeUnit unit);

@ -24,9 +24,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.redisson.BaseMasterSlaveServersConfig;
@ -117,6 +119,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
protected MasterSlaveServersConfig config;
@ -137,6 +141,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
this(config);
init(cfg);
for (int i = 0; i < locks.length; i++) {
locks[i] = new Semaphore(1);
}
}
public MasterSlaveConnectionManager(Config cfg) {
@ -302,108 +310,170 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE);
return promise;
}
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, Semaphore semaphore) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore);
return promise;
}
public Promise<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener) {
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE);
return promise;
}
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener, Semaphore semaphore) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore);
return promise;
}
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type) {
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.lock();
if (name2PubSubConnection.get(channelName) != сonnEntry) {
сonnEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (сonnEntry.isActive()) {
сonnEntry.addListener(channelName, listener);
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
}
});
сonnEntry.unlock();
return;
}
сonnEntry.unlock();
connect(codec, channelName, listener, promise, type);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (final PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
oldEntry.lock();
if (name2PubSubConnection.get(channelName) != oldEntry) {
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
oldEntry.unlock();
return;
}
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
entry.lock();
if (name2PubSubConnection.get(channelName) != entry) {
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
private Semaphore[] locks = new Semaphore[50];
private Semaphore freePubSubLock = new Semaphore(1);
public Semaphore getSemaphore(String channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)];
}
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type, Semaphore lock) {
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.addListener(channelName, listener);
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
}
if (!entry.isActive()) {
entry.release();
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
});
lock.release();
return;
}
freePubSubLock.acquireUninterruptibly();
PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(entry);
}
});
});
lock.release();
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
freeEntry.addListener(channelName, listener);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(freeEntry);
lock.release();
}
});
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
}
}
entry.addListener(channelName, listener);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type) {
Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
lock.acquireUninterruptibly();
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.addListener(channelName, listener);
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
}
entry.unlock();
});
lock.release();
return;
}
}
connect(codec, channelName, listener, promise, type);
freePubSubLock.acquireUninterruptibly();
PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
lock.release();
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
freeEntry.addListener(channelName, listener);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(freeEntry);
lock.release();
}
});
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
}
}
private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,
final Promise<PubSubConnectionEntry> promise, final PubSubType type) {
final Promise<PubSubConnectionEntry> promise, final PubSubType type, final Semaphore lock) {
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@ -412,144 +482,141 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
freePubSubLock.release();
lock.release();
return;
}
RedisPubSubConnection conn = future.getNow();
final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
oldEntry.lock();
if (name2PubSubConnection.get(channelName) != oldEntry) {
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
oldEntry.unlock();
return;
}
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
entry.lock();
if (name2PubSubConnection.get(channelName) != entry) {
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (!entry.isActive()) {
entry.release();
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
freePubSubLock.release();
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(entry);
promise.setSuccess(oldEntry);
}
});
entry.addListener(channelName, listener);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
lock.release();
return;
}
freePubSubConnections.add(entry);
freePubSubLock.release();
entry.addListener(channelName, listener);
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(entry);
lock.release();
}
entry.unlock();
return;
});
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
}
}
});
}
@Override
public Codec unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
final CountDownLatch latch = new CountDownLatch(1);
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
latch.countDown();
return true;
public Codec unsubscribe(final String channelName, Semaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
return null;
}
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
freePubSubLock.acquireUninterruptibly();
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
return false;
freePubSubLock.release();
lock.release();
return true;
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
// same thread should be used
entry.lock();
try {
if (entry.tryClose()) {
releaseSubscribeConnection(0, entry);
}
} finally {
entry.unlock();
}
return entryCodec;
});
return entryCodec;
}
@Override
public Codec punsubscribe(final String channelName) {
public Codec unsubscribe(String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
entry.unsubscribe(channelName, null);
return entryCodec;
}
public Codec punsubscribe(final String channelName, Semaphore lock) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
return null;
}
Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
final CountDownLatch latch = new CountDownLatch(1);
freePubSubLock.acquireUninterruptibly();
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
latch.countDown();
if (entry.release() == 1) {
freePubSubConnections.add(entry);
}
freePubSubLock.release();
lock.release();
return true;
}
return false;
}
});
// same thread should be used
entry.lock();
try {
if (entry.tryClose()) {
releaseSubscribeConnection(0, entry);
}
} finally {
entry.unlock();
}
return entryCodec;
}
@Override
public Codec punsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
entry.punsubscribe(channelName, null);
return entryCodec;
}

@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.MasterSlaveServersConfig;
@ -157,41 +158,31 @@ public class MasterSlaveEntry {
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
pubSubEntry.lock();
try {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners);
} finally {
pubSubEntry.unlock();
}
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners, semaphore);
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
pubSubEntry.lock();
try {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners);
} finally {
pubSubEntry.unlock();
}
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners, semaphore);
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners, Semaphore semaphore) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
if (listeners.isEmpty()) {
return;
}
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null, semaphore);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
@ -211,10 +202,10 @@ public class MasterSlaveEntry {
}
private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener> listeners) {
final Collection<RedisPubSubListener> listeners, Semaphore semaphore) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null);
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null, semaphore);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)

@ -15,15 +15,13 @@
*/
package org.redisson.connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubConnection;
@ -38,11 +36,8 @@ public class PubSubConnectionEntry {
public enum Status {ACTIVE, INACTIVE}
private ReentrantLock lock = new ReentrantLock();
private volatile Status status = Status.ACTIVE;
private final Semaphore subscribedChannelsAmount;
private final AtomicInteger subscribedChannelsAmount;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
private final ConcurrentMap<String, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<String, SubscribeListener>();
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
@ -50,8 +45,7 @@ public class PubSubConnectionEntry {
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
this.conn = conn;
this.subscriptionsPerConnection = subscriptionsPerConnection;
this.subscribedChannelsAmount = new Semaphore(subscriptionsPerConnection);
this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection);
}
public boolean hasListeners(String channelName) {
@ -92,14 +86,6 @@ public class PubSubConnectionEntry {
conn.addListener(listener);
}
public boolean isActive() {
return status == Status.ACTIVE;
}
public void close() {
status = Status.INACTIVE;
}
// TODO optimize
public boolean removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channelName);
@ -122,12 +108,21 @@ public class PubSubConnectionEntry {
conn.removeListener(listener);
}
public boolean tryAcquire() {
return subscribedChannelsAmount.tryAcquire();
public int tryAcquire() {
while (true) {
int value = subscribedChannelsAmount.get();
if (value == 0) {
return -1;
}
if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
return value - 1;
}
}
}
public void release() {
subscribedChannelsAmount.release();
public int release() {
return subscribedChannelsAmount.incrementAndGet();
}
public void subscribe(Codec codec, String channelName) {
@ -162,9 +157,11 @@ public class PubSubConnectionEntry {
@Override
public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) {
removeListeners(channel);
listener.onStatus(type, channel);
conn.removeListener(this);
removeListeners(channel);
if (listener != null) {
listener.onStatus(type, channel);
}
return true;
}
return false;
@ -186,7 +183,6 @@ public class PubSubConnectionEntry {
conn.removeListener(listener);
}
}
subscribedChannelsAmount.release();
}
public void punsubscribe(final String channel, final RedisPubSubListener listener) {
@ -194,9 +190,11 @@ public class PubSubConnectionEntry {
@Override
public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) {
removeListeners(channel);
listener.onStatus(type, channel);
conn.removeListener(this);
removeListeners(channel);
if (listener != null) {
listener.onStatus(type, channel);
}
return true;
}
return false;
@ -205,25 +203,8 @@ public class PubSubConnectionEntry {
conn.punsubscribe(channel);
}
public boolean tryClose() {
if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) {
close();
return true;
}
return false;
}
public RedisPubSubConnection getConnection() {
return conn;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
}

@ -16,6 +16,7 @@
package org.redisson.pubsub;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.redisson.PubSubEntry;
import org.redisson.client.BaseRedisPubSubListener;
@ -23,7 +24,6 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
@ -34,15 +34,16 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();
public void unsubscribe(E entry, String entryName, String channelName, ConnectionManager connectionManager) {
synchronized (this) {
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (removed) {
connectionManager.unsubscribe(channelName);
}
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (removed) {
connectionManager.unsubscribe(channelName, semaphore);
}
}
semaphore.release();
}
public E getEntry(String entryName) {
@ -50,27 +51,29 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
public Future<E> subscribe(String entryName, String channelName, ConnectionManager connectionManager) {
synchronized (this) {
Semaphore semaphore = connectionManager.getSemaphore(channelName);
semaphore.acquireUninterruptibly();
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
semaphore.release();
return entry.getPromise();
}
Promise<E> newPromise = connectionManager.newPromise();
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
return oldValue.getPromise();
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
return newPromise;
}
}
protected abstract E createEntry(Promise<E> newPromise);

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Publisher;
import org.redisson.PubSubPatternMessageListener;
@ -81,44 +82,28 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
return;
}
PubSubConnectionEntry entry = future.getNow();
entry.lock();
try {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
promise.setSuccess(pubSubListener.hashCode());
return;
}
} finally {
entry.unlock();
}
addListener(pubSubListener, promise);
promise.setSuccess(pubSubListener.hashCode());
}
});
}
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
} finally {
entry.unlock();
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
} else {
semaphore.release();
}
// entry is inactive trying add again
removeListener(listenerId);
}
@Override

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Publisher;
import org.redisson.PubSubMessageListener;
@ -99,26 +100,21 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
@Override
public void removeListener(int listenerId) {
Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name);
}
return;
}
} finally {
entry.unlock();
}
// listener has been re-attached
removeListener(listenerId);
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
} else {
semaphore.release();
}
}

Loading…
Cancel
Save