diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index d4d7fabbe..625b83798 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -91,7 +91,7 @@ public class PublishSubscribeService { private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1); - private final Map name2entry = new ConcurrentHashMap<>(); + private final Map> name2entry = new ConcurrentHashMap<>(); private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap<>(); private final ConcurrentMap entry2PubSubConnection = new ConcurrentHashMap<>(); @@ -352,7 +352,8 @@ public class PublishSubscribeService { return; } - name2entry.put(channelName, entry); + Collection coll = name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())); + coll.add(entry); if (remainFreeAmount == 0) { freePubSubConnections.getEntries().poll(); @@ -436,7 +437,8 @@ public class PublishSubscribeService { return; } - name2entry.put(channelName, msEntry); + Collection coll = name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())); + coll.add(msEntry); if (remainFreeAmount > 0) { PubSubEntry psEntry = entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry()); @@ -457,15 +459,15 @@ public class PublishSubscribeService { } public CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName) { - MasterSlaveEntry entry = name2entry.get(channelName); - if (entry == null) { + Collection coll = name2entry.getOrDefault(channelName, Collections.emptySet()); + if (coll.isEmpty()) { RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); CompletableFuture promise = new CompletableFuture<>(); promise.completeExceptionally(ex); return promise; } - return unsubscribeLocked(topicType, channelName, entry); + return unsubscribeLocked(topicType, channelName, coll.iterator().next()); } private CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName, MasterSlaveEntry msEntry) { @@ -526,15 +528,15 @@ public class PublishSubscribeService { } public CompletableFuture unsubscribe(ChannelName channelName, PubSubType topicType) { - MasterSlaveEntry entry = name2entry.get(channelName); - if (entry == null) { + Collection coll = name2entry.getOrDefault(channelName, Collections.emptySet()); + if (coll.isEmpty()) { RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); CompletableFuture promise = new CompletableFuture<>(); promise.completeExceptionally(ex); return promise; } - return unsubscribe(channelName, entry, topicType); + return unsubscribe(channelName, coll.iterator().next(), topicType); } private CompletableFuture unsubscribe(ChannelName channelName, MasterSlaveEntry e, PubSubType topicType) { @@ -737,18 +739,12 @@ public class PublishSubscribeService { }, timeout, TimeUnit.MILLISECONDS); return sf.thenCompose(res -> { - Collection entries; - if (isMultiEntity(channelName)) { - entries = connectionManager.getEntrySet(); - } else { - MasterSlaveEntry entry = getEntry(channelName); - if (entry == null) { - semaphore.release(); - CompletableFuture f = new CompletableFuture<>(); - f.completeExceptionally(new IllegalStateException("Unable to find entry for channel: " + channelName)); - return f; - } - entries = Collections.singletonList(entry); + Collection entries = name2entry.getOrDefault(channelName, Collections.emptySet()); + if (entries.isEmpty()) { + semaphore.release(); + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new IllegalStateException("Unable to find entry for channel: " + channelName)); + return f; } List> futures = new ArrayList<>(entries.size());