@ -91,6 +91,7 @@ public class PublishSubscribeService {
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore ( 1 ) ;
private final Map < ChannelName , MasterSlaveEntry > name2entry = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < PubSubKey , PubSubConnectionEntry > name2PubSubConnection = new ConcurrentHashMap < > ( ) ;
private final ConcurrentMap < MasterSlaveEntry , PubSubEntry > entry2PubSubConnection = new ConcurrentHashMap < > ( ) ;
@ -122,10 +123,22 @@ public class PublishSubscribeService {
return semaphorePubSub ;
}
p ublic PubSubConnectionEntry getPubSubEntry ( ChannelName channelName ) {
p rivate PubSubConnectionEntry getPubSubEntry ( ChannelName channelName ) {
return name2PubSubConnection . get ( createKey ( channelName ) ) ;
}
public int countListeners ( ChannelName channelName ) {
PubSubConnectionEntry entry = getPubSubEntry ( channelName ) ;
if ( entry ! = null ) {
return entry . countListeners ( channelName ) ;
}
return 0 ;
}
public boolean hasEntry ( ChannelName channelName ) {
return getPubSubEntry ( channelName ) ! = null ;
}
public CompletableFuture < Collection < PubSubConnectionEntry > > psubscribe ( ChannelName channelName , Codec codec , RedisPubSubListener < ? > . . . listeners ) {
if ( isMultiEntity ( channelName ) ) {
Collection < MasterSlaveEntry > entrySet = connectionManager . getEntrySet ( ) ;
@ -213,7 +226,8 @@ public class PublishSubscribeService {
return ;
}
subscribe ( codec , channelName , entry , promise , type , lock , new AtomicInteger ( ) , listeners ) ;
subscribeNoTimeout ( codec , channelName , entry , promise , type , lock , new AtomicInteger ( ) , listeners ) ;
timeout ( promise ) ;
} ) ;
return promise ;
}
@ -242,13 +256,6 @@ public class PublishSubscribeService {
return new PubSubKey ( channelName , entry ) ;
}
private void subscribe ( Codec codec , ChannelName channelName , MasterSlaveEntry entry ,
CompletableFuture < PubSubConnectionEntry > promise , PubSubType type ,
AsyncSemaphore lock , AtomicInteger attempts , RedisPubSubListener < ? > . . . listeners ) {
subscribeNoTimeout ( codec , channelName , entry , promise , type , lock , attempts , listeners ) ;
timeout ( promise ) ;
}
public void timeout ( CompletableFuture < ? > promise ) {
int timeout = config . getTimeout ( ) + config . getRetryInterval ( ) * config . getRetryAttempts ( ) ;
timeout ( promise , timeout ) ;
@ -345,6 +352,8 @@ public class PublishSubscribeService {
return ;
}
name2entry . put ( channelName , entry ) ;
if ( remainFreeAmount = = 0 ) {
freePubSubConnections . getEntries ( ) . poll ( ) ;
}
@ -386,7 +395,7 @@ public class PublishSubscribeService {
connEntry . removeListener ( channelName , listener ) ;
}
if ( ! connEntry . hasListeners ( channelName ) ) {
unsubscribe ( type , channelName )
unsubscribe Locked ( type , channelName )
. whenComplete ( ( r , ex ) - > {
lock . release ( ) ;
} ) ;
@ -427,6 +436,8 @@ public class PublishSubscribeService {
return ;
}
name2entry . put ( channelName , msEntry ) ;
if ( remainFreeAmount > 0 ) {
PubSubEntry psEntry = entry2PubSubConnection . computeIfAbsent ( msEntry , e - > new PubSubEntry ( ) ) ;
psEntry . getEntries ( ) . add ( entry ) ;
@ -445,13 +456,24 @@ public class PublishSubscribeService {
return connFuture ;
}
public CompletableFuture < Void > unsubscribe ( PubSubType topicType , ChannelName channelName ) {
PubSubConnectionEntry entry = name2PubSubConnection . remove ( createKey ( channelName ) ) ;
public CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName ) {
MasterSlaveEntry entry = name2entry . get ( channelName ) ;
if ( entry = = null ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Void > promise = new CompletableFuture < > ( ) ;
promise . completeExceptionally ( ex ) ;
return promise ;
}
return unsubscribeLocked ( topicType , channelName , entry ) ;
}
private CompletableFuture < Void > unsubscribeLocked ( PubSubType topicType , ChannelName channelName , MasterSlaveEntry msEntry ) {
PubSubConnectionEntry entry = name2PubSubConnection . remove ( new PubSubKey ( channelName , msEntry ) ) ;
if ( entry = = null | | connectionManager . getServiceManager ( ) . isShuttingDown ( ) ) {
return CompletableFuture . completedFuture ( null ) ;
}
MasterSlaveEntry msEntry = getEntry ( channelName ) ;
CompletableFuture < Void > result = new CompletableFuture < > ( ) ;
BaseRedisPubSubListener listener = new BaseRedisPubSubListener ( ) {
@ -500,10 +522,11 @@ public class PublishSubscribeService {
public void remove ( MasterSlaveEntry entry ) {
entry2PubSubConnection . remove ( entry ) ;
name2entry . values ( ) . remove ( entry ) ;
}
public CompletableFuture < Codec > unsubscribe ( ChannelName channelName , PubSubType topicType ) {
MasterSlaveEntry entry = getEntry ( channelName ) ;
MasterSlaveEntry entry = name2entry. get ( channelName ) ;
if ( entry = = null ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < Codec > promise = new CompletableFuture < > ( ) ;
@ -740,7 +763,7 @@ public class PublishSubscribeService {
CompletableFuture < Void > f ;
if ( ! entry . hasListeners ( channelName ) ) {
f = unsubscribe ( type , channelNam e)
f = unsubscribe Locked ( type , channelNam e, e)
. exceptionally ( ex - > null ) ;
} else {
f = CompletableFuture . completedFuture ( null ) ;
@ -770,7 +793,7 @@ public class PublishSubscribeService {
}
if ( entry . hasListeners ( channelName ) ) {
CompletableFuture < Void > ff = unsubscribe ( type , channelName ) ;
CompletableFuture < Void > ff = unsubscribe Locked ( type , channelName ) ;
return ff . whenComplete ( ( r1 , e1 ) - > {
semaphore . release ( ) ;
} ) ;