@ -91,7 +91,7 @@ public class PublishSubscribeService {
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore ( 1 ) ;
private final Map < ChannelName , MasterSlaveEntry> name2entry = new ConcurrentHashMap < > ( ) ;
private final Map < ChannelName , Collection< MasterSlaveEntry> > name2entry = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < PubSubKey , PubSubConnectionEntry > name2PubSubConnection = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < MasterSlaveEntry , PubSubEntry > entry2PubSubConnection = new ConcurrentHashMap < > ( ) ;
@ -352,7 +352,8 @@ public class PublishSubscribeService {
return ;
}
name2entry . put ( channelName , entry ) ;
Collection < MasterSlaveEntry > coll = name2entry . computeIfAbsent ( channelName , k - > Collections . newSetFromMap ( new ConcurrentHashMap < > ( ) ) ) ;
coll . add ( entry ) ;
if ( remainFreeAmount = = 0 ) {
freePubSubConnections . getEntries ( ) . poll ( ) ;
@ -436,7 +437,8 @@ public class PublishSubscribeService {
return ;
}
name2entry . put ( channelName , msEntry ) ;
Collection < MasterSlaveEntry > coll = name2entry . computeIfAbsent ( channelName , k - > Collections . newSetFromMap ( new ConcurrentHashMap < > ( ) ) ) ;
coll . add ( msEntry ) ;
if ( remainFreeAmount > 0 ) {
PubSubEntry psEntry = entry2PubSubConnection . computeIfAbsent ( msEntry , e - > new PubSubEntry ( ) ) ;
@ -457,15 +459,15 @@ public class PublishSubscribeService {
}
public CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName ) {
MasterSlaveEntry entry = name2entry . get ( channelName ) ;
if ( entry = = null ) {
Collection< MasterSlaveEntry > coll = name2entry . getOrDefault ( channelName , Collections . emptySet ( ) ) ;
if ( coll. isEmpty ( ) ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Void > promise = new CompletableFuture < > ( ) ;
promise . completeExceptionally ( ex ) ;
return promise ;
}
return unsubscribeLocked ( topicType , channelName , entry ) ;
return unsubscribeLocked ( topicType , channelName , coll. iterator ( ) . next ( ) ) ;
}
private CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName , MasterSlaveEntry msEntry ) {
@ -526,15 +528,15 @@ public class PublishSubscribeService {
}
public CompletableFuture < Codec > unsubscribe ( ChannelName channelName , PubSubType topicType ) {
MasterSlaveEntry entry = name2entry . get ( channelName ) ;
if ( entry = = null ) {
Collection< MasterSlaveEntry > coll = name2entry . getOrDefault ( channelName , Collections . emptySet ( ) ) ;
if ( coll. isEmpty ( ) ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Codec > promise = new CompletableFuture < > ( ) ;
promise . completeExceptionally ( ex ) ;
return promise ;
}
return unsubscribe ( channelName , entry , topicType ) ;
return unsubscribe ( channelName , coll. iterator ( ) . next ( ) , topicType ) ;
}
private CompletableFuture < Codec > unsubscribe ( ChannelName channelName , MasterSlaveEntry e , PubSubType topicType ) {
@ -737,18 +739,12 @@ public class PublishSubscribeService {
} , timeout , TimeUnit . MILLISECONDS ) ;
return sf . thenCompose ( res - > {
Collection < MasterSlaveEntry > entries ;
if ( isMultiEntity ( channelName ) ) {
entries = connectionManager . getEntrySet ( ) ;
} else {
MasterSlaveEntry entry = getEntry ( channelName ) ;
if ( entry = = null ) {
semaphore . release ( ) ;
CompletableFuture < Void > f = new CompletableFuture < > ( ) ;
f . completeExceptionally ( new IllegalStateException ( "Unable to find entry for channel: " + channelName ) ) ;
return f ;
}
entries = Collections . singletonList ( entry ) ;
Collection < MasterSlaveEntry > entries = name2entry . getOrDefault ( channelName , Collections . emptySet ( ) ) ;
if ( entries . isEmpty ( ) ) {
semaphore . release ( ) ;
CompletableFuture < Void > f = new CompletableFuture < > ( ) ;
f . completeExceptionally ( new IllegalStateException ( "Unable to find entry for channel: " + channelName ) ) ;
return f ;
}
List < CompletableFuture < ? > > futures = new ArrayList < > ( entries . size ( ) ) ;