diff --git a/src/main/java/org/redisson/RedissonPatternTopic.java b/src/main/java/org/redisson/RedissonPatternTopic.java index b3dd2f9b5..4daf94056 100644 --- a/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/src/main/java/org/redisson/RedissonPatternTopic.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.Collections; import java.util.List; +import java.util.concurrent.Semaphore; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; @@ -70,26 +71,21 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(int listenerId) { + Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { + semaphore.release(); return; } - entry.lock(); - try { - if (entry.isActive()) { - entry.removeListener(name, listenerId); - if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().punsubscribe(name); - } - return; - } - } finally { - entry.unlock(); + entry.removeListener(name, listenerId); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + } else { + semaphore.release(); } - - // listener has been re-attached - removeListener(listenerId); } @Override diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 2d5b0fbe8..d0e851fcc 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -17,11 +17,13 @@ package org.redisson; import java.util.Collections; import java.util.List; +import java.util.concurrent.Semaphore; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.MessageListener; import org.redisson.core.RTopic; @@ -85,26 +87,21 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(int listenerId) { + Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { + semaphore.release(); return; } - - entry.lock(); - try { - if (entry.isActive()) { - entry.removeListener(name, listenerId); - if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().unsubscribe(name); - } - return; - } - } finally { - entry.unlock(); - } - // listener has been re-attached - removeListener(listenerId); + entry.removeListener(name, listenerId); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + } else { + semaphore.release(); + } } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 4750e4fb6..0c24f6d43 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -18,6 +18,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.redisson.MasterSlaveServersConfig; @@ -45,6 +46,8 @@ public interface ConnectionManager { boolean isClusterMode(); + Semaphore getSemaphore(String channelName); + Future newSucceededFuture(R value); ConnectionEventsHub getConnectionEventsHub(); @@ -53,8 +56,10 @@ public interface ConnectionManager { boolean isShuttingDown(); - Promise subscribe(Codec codec, String channelName, RedisPubSubListener listener); + Future subscribe(Codec codec, String channelName, RedisPubSubListener listener); + Future subscribe(Codec codec, String channelName, final RedisPubSubListener listener, Semaphore semaphore); + ConnectionInitializer getConnectListener(); IdleConnectionWatcher getConnectionWatcher(); @@ -92,11 +97,17 @@ public interface ConnectionManager { PubSubConnectionEntry getPubSubEntry(String channelName); Future psubscribe(String pattern, Codec codec, RedisPubSubListener listener); + + Future psubscribe(String pattern, Codec codec, RedisPubSubListener listener, Semaphore semaphore); + Codec unsubscribe(final String channelName, Semaphore lock); + Codec unsubscribe(String channelName); Codec punsubscribe(String channelName); + Codec punsubscribe(final String channelName, Semaphore lock); + void shutdown(); void shutdown(long quietPeriod, long timeout, TimeUnit unit); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 3f0912e22..319c7a38f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -24,9 +24,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.redisson.BaseMasterSlaveServersConfig; @@ -117,6 +119,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected Class socketChannelClass; protected final ConcurrentMap name2PubSubConnection = PlatformDependent.newConcurrentHashMap(); + + protected final Queue freePubSubConnections = new ConcurrentLinkedQueue(); protected MasterSlaveServersConfig config; @@ -137,6 +141,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { this(config); init(cfg); + + for (int i = 0; i < locks.length; i++) { + locks[i] = new Semaphore(1); + } } public MasterSlaveConnectionManager(Config cfg) { @@ -302,108 +310,170 @@ public class MasterSlaveConnectionManager implements ConnectionManager { subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE); return promise; } + + public Future psubscribe(String channelName, Codec codec, RedisPubSubListener listener, Semaphore semaphore) { + Promise promise = newPromise(); + subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore); + return promise; + } + - public Promise subscribe(Codec codec, String channelName, final RedisPubSubListener listener) { + public Future subscribe(Codec codec, String channelName, final RedisPubSubListener listener) { Promise promise = newPromise(); subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE); return promise; } + + public Future subscribe(Codec codec, String channelName, final RedisPubSubListener listener, Semaphore semaphore) { + Promise promise = newPromise(); + subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore); + return promise; + } - private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise, PubSubType type) { - final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); - if (сonnEntry != null) { - сonnEntry.lock(); - if (name2PubSubConnection.get(channelName) != сonnEntry) { - сonnEntry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; - } - if (сonnEntry.isActive()) { - сonnEntry.addListener(channelName, listener); - сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(сonnEntry); - } - }); - сonnEntry.unlock(); - return; - } - сonnEntry.unlock(); - - connect(codec, channelName, listener, promise, type); - return; - } - Set entries = new HashSet(name2PubSubConnection.values()); - for (final PubSubConnectionEntry entry : entries) { - if (entry.tryAcquire()) { - final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - entry.release(); - - oldEntry.lock(); - if (name2PubSubConnection.get(channelName) != oldEntry) { - oldEntry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; - } - - if (oldEntry.isActive()) { - oldEntry.addListener(channelName, listener); - oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(oldEntry); - } - }); - oldEntry.unlock(); - return; - } - oldEntry.unlock(); - - subscribe(codec, channelName, listener, promise, type); - return; - } - - entry.lock(); - if (name2PubSubConnection.get(channelName) != entry) { - entry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; + private Semaphore[] locks = new Semaphore[50]; + + private Semaphore freePubSubLock = new Semaphore(1); + + public Semaphore getSemaphore(String channelName) { + return locks[Math.abs(channelName.hashCode() % locks.length)]; + } + + private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise, PubSubType type, Semaphore lock) { + + final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); + if (сonnEntry != null) { + сonnEntry.addListener(channelName, listener); + сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(сonnEntry); } - - if (!entry.isActive()) { - entry.release(); - entry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; + }); + lock.release(); + return; + } + + freePubSubLock.acquireUninterruptibly(); + PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); + if (freeEntry == null) { + connect(codec, channelName, listener, promise, type, lock); + return; + } + + int remainFreeAmount = freeEntry.tryAcquire(); + if (remainFreeAmount == -1) { + throw new IllegalStateException(); + } + + final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); + if (oldEntry != null) { + freeEntry.release(); + freePubSubLock.release(); + + oldEntry.addListener(channelName, listener); + oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(oldEntry); } - - entry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(entry); - } - }); + }); + lock.release(); + return; + } + + if (remainFreeAmount == 0) { + freePubSubConnections.poll(); + } + freePubSubLock.release(); + + freeEntry.addListener(channelName, listener); + freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(freeEntry); + lock.release(); + } + }); + + + if (PubSubType.PSUBSCRIBE == type) { + freeEntry.psubscribe(codec, channelName); + } else { + freeEntry.subscribe(codec, channelName); + } + } - entry.addListener(channelName, listener); - if (PubSubType.PSUBSCRIBE == type) { - entry.psubscribe(codec, channelName); - } else { - entry.subscribe(codec, channelName); + + private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise, PubSubType type) { + + Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; + lock.acquireUninterruptibly(); + final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); + if (сonnEntry != null) { + сonnEntry.addListener(channelName, listener); + сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(сonnEntry); } - entry.unlock(); - + }); + lock.release(); return; } - } - connect(codec, channelName, listener, promise, type); + freePubSubLock.acquireUninterruptibly(); + PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); + if (freeEntry == null) { + connect(codec, channelName, listener, promise, type, lock); + return; + } + + int remainFreeAmount = freeEntry.tryAcquire(); + if (remainFreeAmount == -1) { + throw new IllegalStateException(); + } + + final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); + if (oldEntry != null) { + freeEntry.release(); + freePubSubLock.release(); + + oldEntry.addListener(channelName, listener); + oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(oldEntry); + } + }); + lock.release(); + return; + } + + if (remainFreeAmount == 0) { + freePubSubConnections.poll(); + } + freePubSubLock.release(); + + freeEntry.addListener(channelName, listener); + freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(freeEntry); + lock.release(); + } + }); + + + if (PubSubType.PSUBSCRIBE == type) { + freeEntry.psubscribe(codec, channelName); + } else { + freeEntry.subscribe(codec, channelName); + } } private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, - final Promise promise, final PubSubType type) { + final Promise promise, final PubSubType type, final Semaphore lock) { final int slot = 0; Future connFuture = nextPubSubConnection(slot); connFuture.addListener(new FutureListener() { @@ -412,144 +482,141 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { promise.setFailure(future.cause()); + freePubSubLock.release(); + lock.release(); return; } RedisPubSubConnection conn = future.getNow(); + final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); + final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { releaseSubscribeConnection(slot, entry); - - oldEntry.lock(); - if (name2PubSubConnection.get(channelName) != oldEntry) { - oldEntry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; - } - - if (oldEntry.isActive()) { - oldEntry.addListener(channelName, listener); - oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(oldEntry); - } - }); - oldEntry.unlock(); - return; - } - oldEntry.unlock(); - - subscribe(codec, channelName, listener, promise, type); - return; - } - - entry.lock(); - if (name2PubSubConnection.get(channelName) != entry) { - entry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; - } - if (!entry.isActive()) { - entry.release(); - entry.unlock(); - subscribe(codec, channelName, listener, promise, type); - return; - } + freePubSubLock.release(); - entry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + oldEntry.addListener(channelName, listener); + oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(entry); + promise.setSuccess(oldEntry); } }); - entry.addListener(channelName, listener); - if (PubSubType.PSUBSCRIBE == type) { - entry.psubscribe(codec, channelName); - } else { - entry.subscribe(codec, channelName); + lock.release(); + return; + } + + freePubSubConnections.add(entry); + freePubSubLock.release(); + + entry.addListener(channelName, listener); + entry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(entry); + lock.release(); } - entry.unlock(); - return; + }); + + if (PubSubType.PSUBSCRIBE == type) { + entry.psubscribe(codec, channelName); + } else { + entry.subscribe(codec, channelName); + } + } }); } - @Override - public Codec unsubscribe(final String channelName) { - final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null) { - return null; - } - - Codec entryCodec = entry.getConnection().getChannels().get(channelName); - final CountDownLatch latch = new CountDownLatch(1); - entry.unsubscribe(channelName, new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { - latch.countDown(); - return true; + public Codec unsubscribe(final String channelName, Semaphore lock) { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + lock.release(); + return null; + } + + Codec entryCodec = entry.getConnection().getChannels().get(channelName); + freePubSubLock.acquireUninterruptibly(); + entry.unsubscribe(channelName, new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { + + if (entry.release() == 1) { + freePubSubConnections.add(entry); } - return false; + freePubSubLock.release(); + + lock.release(); + return true; } - - }); - - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + return false; } - // same thread should be used - entry.lock(); - try { - if (entry.tryClose()) { - releaseSubscribeConnection(0, entry); - } - } finally { - entry.unlock(); - } - return entryCodec; + }); + + return entryCodec; } @Override - public Codec punsubscribe(final String channelName) { + public Codec unsubscribe(String channelName) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { return null; } - + + Codec entryCodec = entry.getConnection().getChannels().get(channelName); + entry.unsubscribe(channelName, null); + + return entryCodec; + } + + public Codec punsubscribe(final String channelName, Semaphore lock) { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + lock.release(); + return null; + } + Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName); - final CountDownLatch latch = new CountDownLatch(1); + freePubSubLock.acquireUninterruptibly(); entry.punsubscribe(channelName, new BaseRedisPubSubListener() { - + @Override public boolean onStatus(PubSubType type, String channel) { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { - latch.countDown(); + + if (entry.release() == 1) { + freePubSubConnections.add(entry); + } + freePubSubLock.release(); + + lock.release(); return true; } return false; } - + }); - // same thread should be used - entry.lock(); - try { - if (entry.tryClose()) { - releaseSubscribeConnection(0, entry); - } - } finally { - entry.unlock(); - } + return entryCodec; + } + + @Override + public Codec punsubscribe(final String channelName) { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + return null; + } + + Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName); + entry.punsubscribe(channelName, null); + return entryCodec; } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index a17389dec..6394a745a 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.MasterSlaveServersConfig; @@ -157,41 +158,31 @@ public class MasterSlaveEntry { private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { for (String channelName : redisPubSubConnection.getChannels().keySet()) { - PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); + Semaphore semaphore = connectionManager.getSemaphore(channelName); + semaphore.acquireUninterruptibly(); - pubSubEntry.lock(); - try { - pubSubEntry.close(); - - Collection listeners = pubSubEntry.getListeners(channelName); - reattachPubSubListeners(channelName, listeners); - } finally { - pubSubEntry.unlock(); - } + PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); + Collection listeners = pubSubEntry.getListeners(channelName); + reattachPubSubListeners(channelName, listeners, semaphore); } for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { - PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); + Semaphore semaphore = connectionManager.getSemaphore(channelName); + semaphore.acquireUninterruptibly(); - pubSubEntry.lock(); - try { - pubSubEntry.close(); - - Collection listeners = pubSubEntry.getListeners(channelName); - reattachPatternPubSubListeners(channelName, listeners); - } finally { - pubSubEntry.unlock(); - } + PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); + Collection listeners = pubSubEntry.getListeners(channelName); + reattachPatternPubSubListeners(channelName, listeners, semaphore); } } - private void reattachPubSubListeners(final String channelName, final Collection listeners) { + private void reattachPubSubListeners(final String channelName, final Collection listeners, Semaphore semaphore) { Codec subscribeCodec = connectionManager.unsubscribe(channelName); if (listeners.isEmpty()) { return; } - Future subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null); + Future subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null, semaphore); subscribeFuture.addListener(new FutureListener() { @Override @@ -211,10 +202,10 @@ public class MasterSlaveEntry { } private void reattachPatternPubSubListeners(final String channelName, - final Collection listeners) { + final Collection listeners, Semaphore semaphore) { Codec subscribeCodec = connectionManager.punsubscribe(channelName); if (!listeners.isEmpty()) { - Future future = connectionManager.psubscribe(channelName, subscribeCodec, null); + Future future = connectionManager.psubscribe(channelName, subscribeCodec, null, semaphore); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 9d83363f3..c0ff3442d 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -15,15 +15,13 @@ */ package org.redisson.connection; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisPubSubConnection; @@ -38,11 +36,8 @@ public class PubSubConnectionEntry { public enum Status {ACTIVE, INACTIVE} - private ReentrantLock lock = new ReentrantLock(); - private volatile Status status = Status.ACTIVE; - private final Semaphore subscribedChannelsAmount; + private final AtomicInteger subscribedChannelsAmount; private final RedisPubSubConnection conn; - private final int subscriptionsPerConnection; private final ConcurrentMap subscribeChannelListeners = new ConcurrentHashMap(); private final ConcurrentMap> channelListeners = new ConcurrentHashMap>(); @@ -50,8 +45,7 @@ public class PubSubConnectionEntry { public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); this.conn = conn; - this.subscriptionsPerConnection = subscriptionsPerConnection; - this.subscribedChannelsAmount = new Semaphore(subscriptionsPerConnection); + this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); } public boolean hasListeners(String channelName) { @@ -92,14 +86,6 @@ public class PubSubConnectionEntry { conn.addListener(listener); } - public boolean isActive() { - return status == Status.ACTIVE; - } - - public void close() { - status = Status.INACTIVE; - } - // TODO optimize public boolean removeListener(String channelName, int listenerId) { Queue listeners = channelListeners.get(channelName); @@ -122,12 +108,21 @@ public class PubSubConnectionEntry { conn.removeListener(listener); } - public boolean tryAcquire() { - return subscribedChannelsAmount.tryAcquire(); + public int tryAcquire() { + while (true) { + int value = subscribedChannelsAmount.get(); + if (value == 0) { + return -1; + } + + if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { + return value - 1; + } + } } - public void release() { - subscribedChannelsAmount.release(); + public int release() { + return subscribedChannelsAmount.incrementAndGet(); } public void subscribe(Codec codec, String channelName) { @@ -162,9 +157,11 @@ public class PubSubConnectionEntry { @Override public boolean onStatus(PubSubType type, String ch) { if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) { - removeListeners(channel); - listener.onStatus(type, channel); conn.removeListener(this); + removeListeners(channel); + if (listener != null) { + listener.onStatus(type, channel); + } return true; } return false; @@ -186,7 +183,6 @@ public class PubSubConnectionEntry { conn.removeListener(listener); } } - subscribedChannelsAmount.release(); } public void punsubscribe(final String channel, final RedisPubSubListener listener) { @@ -194,9 +190,11 @@ public class PubSubConnectionEntry { @Override public boolean onStatus(PubSubType type, String ch) { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) { - removeListeners(channel); - listener.onStatus(type, channel); conn.removeListener(this); + removeListeners(channel); + if (listener != null) { + listener.onStatus(type, channel); + } return true; } return false; @@ -205,25 +203,8 @@ public class PubSubConnectionEntry { conn.punsubscribe(channel); } - - public boolean tryClose() { - if (subscribedChannelsAmount.tryAcquire(subscriptionsPerConnection)) { - close(); - return true; - } - return false; - } - public RedisPubSubConnection getConnection() { return conn; } - - public void lock() { - lock.lock(); - } - - public void unlock() { - lock.unlock(); - } } diff --git a/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/src/main/java/org/redisson/pubsub/PublishSubscribe.java index a2c18c24d..0afb99022 100644 --- a/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -16,6 +16,7 @@ package org.redisson.pubsub; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import org.redisson.PubSubEntry; import org.redisson.client.BaseRedisPubSubListener; @@ -23,7 +24,6 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.PubSubConnectionEntry; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @@ -34,15 +34,16 @@ abstract class PublishSubscribe> { private final ConcurrentMap entries = PlatformDependent.newConcurrentHashMap(); public void unsubscribe(E entry, String entryName, String channelName, ConnectionManager connectionManager) { - synchronized (this) { - if (entry.release() == 0) { - // just an assertion - boolean removed = entries.remove(entryName) == entry; - if (removed) { - connectionManager.unsubscribe(channelName); - } + Semaphore semaphore = connectionManager.getSemaphore(channelName); + semaphore.acquireUninterruptibly(); + if (entry.release() == 0) { + // just an assertion + boolean removed = entries.remove(entryName) == entry; + if (removed) { + connectionManager.unsubscribe(channelName, semaphore); } } + semaphore.release(); } public E getEntry(String entryName) { @@ -50,27 +51,29 @@ abstract class PublishSubscribe> { } public Future subscribe(String entryName, String channelName, ConnectionManager connectionManager) { - synchronized (this) { + Semaphore semaphore = connectionManager.getSemaphore(channelName); + semaphore.acquireUninterruptibly(); E entry = entries.get(entryName); if (entry != null) { entry.aquire(); + semaphore.release(); return entry.getPromise(); } - + Promise newPromise = connectionManager.newPromise(); E value = createEntry(newPromise); value.aquire(); - + E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.aquire(); + semaphore.release(); return oldValue.getPromise(); } - + RedisPubSubListener listener = createListener(channelName, value); - connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener); + connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); return newPromise; - } } protected abstract E createEntry(Promise newPromise); diff --git a/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java index 60d1fd054..c95493129 100644 --- a/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java @@ -17,6 +17,7 @@ package org.redisson.reactive; import java.util.Collections; import java.util.List; +import java.util.concurrent.Semaphore; import org.reactivestreams.Publisher; import org.redisson.PubSubPatternMessageListener; @@ -81,44 +82,28 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive return; } - PubSubConnectionEntry entry = future.getNow(); - entry.lock(); - try { - if (entry.isActive()) { - entry.addListener(name, pubSubListener); - promise.setSuccess(pubSubListener.hashCode()); - return; - } - } finally { - entry.unlock(); - } - addListener(pubSubListener, promise); + promise.setSuccess(pubSubListener.hashCode()); } }); } @Override public void removeListener(int listenerId) { + Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { + semaphore.release(); return; } - entry.lock(); - try { - if (entry.isActive()) { - entry.removeListener(name, listenerId); - if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().punsubscribe(name); - } - return; - } - } finally { - entry.unlock(); + entry.removeListener(name, listenerId); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + } else { + semaphore.release(); } - - // entry is inactive trying add again - removeListener(listenerId); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonTopicReactive.java b/src/main/java/org/redisson/reactive/RedissonTopicReactive.java index b98949336..b38dfabf0 100644 --- a/src/main/java/org/redisson/reactive/RedissonTopicReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonTopicReactive.java @@ -17,6 +17,7 @@ package org.redisson.reactive; import java.util.Collections; import java.util.List; +import java.util.concurrent.Semaphore; import org.reactivestreams.Publisher; import org.redisson.PubSubMessageListener; @@ -99,26 +100,21 @@ public class RedissonTopicReactive implements RTopicReactive { @Override public void removeListener(int listenerId) { + Semaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { + semaphore.release(); return; } - - entry.lock(); - try { - if (entry.isActive()) { - entry.removeListener(name, listenerId); - if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().unsubscribe(name); - } - return; - } - } finally { - entry.unlock(); - } - // listener has been re-attached - removeListener(listenerId); + entry.removeListener(name, listenerId); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + } else { + semaphore.release(); + } }