@ -544,7 +544,7 @@ public class PublishSubscribeService {
}
public CompletableFuture < Codec > unsubscribe ( ChannelName channelName , PubSubType topicType ) {
Collection < MasterSlaveEntry > coll = name2entry . remove ( channelName ) ;
Collection < MasterSlaveEntry > coll = name2entry . get ( channelName ) ;
if ( coll = = null | | coll . isEmpty ( ) ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Codec > promise = new CompletableFuture < > ( ) ;
@ -563,6 +563,7 @@ public class PublishSubscribeService {
AsyncSemaphore lock = getSemaphore ( channelName ) ;
CompletableFuture < Void > f = lock . acquire ( ) ;
return f . thenCompose ( v - > {
name2entry . remove ( channelName ) ;
PubSubConnectionEntry entry = name2PubSubConnection . remove ( new PubSubKey ( channelName , e ) ) ;
if ( entry = = null ) {
lock . release ( ) ;