diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 1eb3125f1..52326e519 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -475,8 +475,8 @@ public class PublishSubscribeService { } public CompletableFuture unsubscribeLocked(PubSubType topicType, ChannelName channelName) { - Collection coll = name2entry.getOrDefault(channelName, Collections.emptySet()); - if (coll.isEmpty()) { + Collection coll = name2entry.remove(channelName); + if (coll == null || coll.isEmpty()) { RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); CompletableFuture promise = new CompletableFuture<>(); promise.completeExceptionally(ex); @@ -544,8 +544,8 @@ public class PublishSubscribeService { } public CompletableFuture unsubscribe(ChannelName channelName, PubSubType topicType) { - Collection coll = name2entry.getOrDefault(channelName, Collections.emptySet()); - if (coll.isEmpty()) { + Collection coll = name2entry.remove(channelName); + if (coll == null || coll.isEmpty()) { RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); CompletableFuture promise = new CompletableFuture<>(); promise.completeExceptionally(ex); @@ -762,8 +762,8 @@ public class PublishSubscribeService { }, timeout, TimeUnit.MILLISECONDS); return sf.thenCompose(res -> { - Collection entries = name2entry.getOrDefault(channelName, Collections.emptySet()); - if (entries.isEmpty()) { + Collection entries = name2entry.remove(channelName); + if (entries == null || entries.isEmpty()) { semaphore.release(); return CompletableFuture.completedFuture(null); }