@ -440,7 +440,7 @@ public class PublishSubscribeService {
connFuture . thenAccept ( conn - > {
freePubSubLock . acquire ( ) . thenAccept ( c - > {
PubSubConnectionEntry entry = new PubSubConnectionEntry ( conn , connectionManager );
PubSubConnectionEntry entry = new PubSubConnectionEntry ( conn , connectionManager , msEntry );
int remainFreeAmount = entry . tryAcquire ( ) ;
PubSubKey key = new PubSubKey ( channelName , msEntry ) ;
@ -468,7 +468,7 @@ public class PublishSubscribeService {
} ) ;
}
public CompletableFuture < Void > unsubscribeLocked ( ChannelName channelName ) {
CompletableFuture < Void > unsubscribeLocked ( ChannelName channelName ) {
PubSubType type = PubSubType . UNSUBSCRIBE ;
if ( shardingSupported ) {
type = PubSubType . SUNSUBSCRIBE ;
@ -477,7 +477,7 @@ public class PublishSubscribeService {
return unsubscribeLocked ( type , channelName ) ;
}
p ublic CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName ) {
p rivate CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName ) {
Collection < MasterSlaveEntry > coll = name2entry . get ( channelName ) ;
if ( coll = = null | | coll . isEmpty ( ) ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
@ -489,7 +489,7 @@ public class PublishSubscribeService {
return unsubscribeLocked ( topicType , channelName , coll . iterator ( ) . next ( ) ) ;
}
private CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName , MasterSlaveEntry msEntry ) {
CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName , MasterSlaveEntry msEntry ) {
PubSubConnectionEntry entry = name2PubSubConnection . remove ( new PubSubKey ( channelName , msEntry ) ) ;
if ( entry = = null | | connectionManager . getServiceManager ( ) . isShuttingDown ( ) ) {
return CompletableFuture . completedFuture ( null ) ;
@ -555,22 +555,13 @@ public class PublishSubscribeService {
public void remove ( MasterSlaveEntry entry ) {
entry2PubSubConnection . remove ( entry ) ;
name2entry . values ( ) . forEach ( v - > v . remove ( entry ) ) ;
}
public CompletableFuture < Codec > unsubscribe ( ChannelName channelName , PubSubType topicType ) {
Collection < MasterSlaveEntry > coll = name2entry . get ( channelName ) ;
if ( coll = = null | | coll . isEmpty ( ) ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Codec > promise = new CompletableFuture < > ( ) ;
promise . completeExceptionally ( ex ) ;
return promise ;
}
return unsubscribe ( channelName , coll . iterator ( ) . next ( ) , topicType ) ;
name2entry . values ( ) . removeIf ( v - > {
v . remove ( entry ) ;
return v . isEmpty ( ) ;
} ) ;
}
p rivate CompletableFuture < Codec > unsubscribe ( ChannelName channelName , MasterSlaveEntry e , PubSubType topicType ) {
public CompletableFuture < Codec > unsubscribe ( ChannelName channelName , MasterSlaveEntry e , PubSubType topicType ) {
if ( connectionManager . getServiceManager ( ) . isShuttingDown ( ) ) {
return CompletableFuture . completedFuture ( null ) ;
}
@ -620,7 +611,7 @@ public class PublishSubscribeService {
public void reattachPubSub ( int slot ) {
name2PubSubConnection . entrySet ( ) . stream ( )
. filter ( e - > connectionManager. calcSlot ( e . getKey ( ) . getChannelName ( ) . getName ( ) ) = = slot )
. filter ( e - > e. getValue ( ) . getEntry ( ) . equals ( connectionManager . getEntry ( slot ) ) )
. forEach ( entry - > {
PubSubConnectionEntry pubSubEntry = entry . getValue ( ) ;
MasterSlaveEntry ee = entry . getKey ( ) . getEntry ( ) ;
@ -842,7 +833,7 @@ public class PublishSubscribeService {
}
if ( entry . hasListeners ( channelName ) ) {
CompletableFuture < Void > ff = unsubscribeLocked ( type , channelName );
CompletableFuture < Void > ff = unsubscribeLocked ( type , channelName , entry . getEntry ( ) );
return ff . whenComplete ( ( r1 , e1 ) - > {
semaphore . release ( ) ;
} ) ;