diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 2bbdfb313..baa7ab551 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -245,7 +245,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } @Override - protected MasterSlaveEntry getEntry(int slot) { + public MasterSlaveEntry getEntry(int slot) { lazyConnect(); return slot2entry.get(slot); diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 03044e53a..1b10875c0 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -50,6 +50,8 @@ public interface ConnectionManager { MasterSlaveEntry getEntry(String name); + MasterSlaveEntry getEntry(int slot); + MasterSlaveEntry getWriteEntry(int slot); MasterSlaveEntry getReadEntry(int slot); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index ef9fd37d5..700f6f5cc 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -458,7 +458,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return getEntry(slot); } - protected MasterSlaveEntry getEntry(int slot) { + public MasterSlaveEntry getEntry(int slot) { lazyConnect(); return masterSlaveEntry; diff --git a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java index 10d2831c2..522b4f090 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/pubsub/PubSubConnectionEntry.java @@ -24,6 +24,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.ServiceManager; import org.redisson.misc.AsyncSemaphore; import org.redisson.misc.WrappedLock; @@ -54,6 +55,7 @@ public class PubSubConnectionEntry { private final ServiceManager serviceManager; private final PublishSubscribeService subscribeService; + private final MasterSlaveEntry entry; private static final Map SUBSCRIBE2UNSUBSCRIBE = new HashMap<>(); @@ -63,14 +65,19 @@ public class PubSubConnectionEntry { SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE); } - public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager) { + public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager, MasterSlaveEntry entry) { super(); this.conn = conn; + this.entry = entry; this.serviceManager = connectionManager.getServiceManager(); this.subscribeService = connectionManager.getSubscribeService(); this.subscribedChannelsAmount = new AtomicInteger(serviceManager.getConfig().getSubscriptionsPerConnection()); } + public MasterSlaveEntry getEntry() { + return entry; + } + public int countListeners(ChannelName channelName) { return channelListeners.getOrDefault(channelName, EMPTY_QUEUE).size(); } @@ -127,7 +134,7 @@ public class PubSubConnectionEntry { } public boolean removeListener(ChannelName channelName, int listenerId) { - Queue> listeners = channelListeners.get(channelName); + Queue> listeners = channelListeners.getOrDefault(channelName, EMPTY_QUEUE); for (RedisPubSubListener listener : listeners) { if (System.identityHashCode(listener) == listenerId) { removeListener(channelName, listener); @@ -176,7 +183,7 @@ public class PubSubConnectionEntry { pp.whenComplete((r, e) -> { if (e != null) { PubSubType unsubscribeType = SUBSCRIBE2UNSUBSCRIBE.get(type); - CompletableFuture f = subscribeService.unsubscribe(channelName, unsubscribeType); + CompletableFuture f = subscribeService.unsubscribe(channelName, entry, unsubscribeType); f.whenComplete((rr, ee) -> { pm.completeExceptionally(e); }); @@ -304,7 +311,7 @@ public class PubSubConnectionEntry { removeListener(channelName, listener); } if (!hasListeners(channelName)) { - subscribeService.unsubscribeLocked(type, channelName) + subscribeService.unsubscribeLocked(type, channelName, entry) .whenComplete((r, ex) -> { lock.release(); }); diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index ac6c13673..9dcae6b56 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -440,7 +440,7 @@ public class PublishSubscribeService { connFuture.thenAccept(conn -> { freePubSubLock.acquire().thenAccept(c -> { - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager); + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager, msEntry); int remainFreeAmount = entry.tryAcquire(); PubSubKey key = new PubSubKey(channelName, msEntry); @@ -468,7 +468,7 @@ public class PublishSubscribeService { }); } - public CompletableFuture unsubscribeLocked(ChannelName channelName) { + CompletableFuture unsubscribeLocked(ChannelName channelName) { PubSubType type = PubSubType.UNSUBSCRIBE; if (shardingSupported) { type = PubSubType.SUNSUBSCRIBE; @@ -477,7 +477,7 @@ public class PublishSubscribeService { return unsubscribeLocked(type, channelName); } - public CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName) { + private CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName) { Collection coll = name2entry.get(channelName); if (coll == null || coll.isEmpty()) { RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); @@ -489,7 +489,7 @@ public class PublishSubscribeService { return unsubscribeLocked(topicType, channelName, coll.iterator().next()); } - private CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName, MasterSlaveEntry msEntry) { + CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName, MasterSlaveEntry msEntry) { PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, msEntry)); if (entry == null || connectionManager.getServiceManager().isShuttingDown()) { return CompletableFuture.completedFuture(null); @@ -555,22 +555,13 @@ public class PublishSubscribeService { public void remove(MasterSlaveEntry entry) { entry2PubSubConnection.remove(entry); - name2entry.values().forEach(v -> v.remove(entry)); - } - - public CompletableFuture unsubscribe(ChannelName channelName, PubSubType topicType) { - Collection coll = name2entry.get(channelName); - if (coll == null || coll.isEmpty()) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); - CompletableFuture promise = new CompletableFuture<>(); - promise.completeExceptionally(ex); - return promise; - } - - return unsubscribe(channelName, coll.iterator().next(), topicType); + name2entry.values().removeIf(v -> { + v.remove(entry); + return v.isEmpty(); + }); } - private CompletableFuture unsubscribe(ChannelName channelName, MasterSlaveEntry e, PubSubType topicType) { + public CompletableFuture unsubscribe(ChannelName channelName, MasterSlaveEntry e, PubSubType topicType) { if (connectionManager.getServiceManager().isShuttingDown()) { return CompletableFuture.completedFuture(null); } @@ -620,7 +611,7 @@ public class PublishSubscribeService { public void reattachPubSub(int slot) { name2PubSubConnection.entrySet().stream() - .filter(e -> connectionManager.calcSlot(e.getKey().getChannelName().getName()) == slot) + .filter(e -> e.getValue().getEntry().equals(connectionManager.getEntry(slot))) .forEach(entry -> { PubSubConnectionEntry pubSubEntry = entry.getValue(); MasterSlaveEntry ee = entry.getKey().getEntry(); @@ -842,7 +833,7 @@ public class PublishSubscribeService { } if (entry.hasListeners(channelName)) { - CompletableFuture ff = unsubscribeLocked(type, channelName); + CompletableFuture ff = unsubscribeLocked(type, channelName, entry.getEntry()); return ff.whenComplete((r1, e1) -> { semaphore.release(); });