@ -475,7 +475,7 @@ public class PublishSubscribeService {
}
public CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName ) {
Collection < MasterSlaveEntry > coll = name2entry . remove ( channelName ) ;
Collection < MasterSlaveEntry > coll = name2entry . get ( channelName ) ;
if ( coll = = null | | coll . isEmpty ( ) ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Void > promise = new CompletableFuture < > ( ) ;
@ -487,6 +487,7 @@ public class PublishSubscribeService {
}
private CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName , MasterSlaveEntry msEntry ) {
name2entry . remove ( channelName ) ;
PubSubConnectionEntry entry = name2PubSubConnection . remove ( new PubSubKey ( channelName , msEntry ) ) ;
if ( entry = = null | | connectionManager . getServiceManager ( ) . isShuttingDown ( ) ) {
return CompletableFuture . completedFuture ( null ) ;
@ -763,7 +764,7 @@ public class PublishSubscribeService {
} , timeout , TimeUnit . MILLISECONDS ) ;
return sf . thenCompose ( res - > {
Collection < MasterSlaveEntry > entries = name2entry . remove ( channelName ) ;
Collection < MasterSlaveEntry > entries = name2entry . get ( channelName ) ;
if ( entries = = null | | entries . isEmpty ( ) ) {
semaphore . release ( ) ;
return CompletableFuture . completedFuture ( null ) ;