@ -269,6 +269,37 @@ public class PublishSubscribeService {
} ) ;
}
private void trySubscribe ( Codec codec , ChannelName channelName ,
CompletableFuture < PubSubConnectionEntry > promise , PubSubType type ,
AsyncSemaphore lock , AtomicInteger attempts , RedisPubSubListener < ? > . . . listeners ) {
if ( attempts . get ( ) = = config . getRetryAttempts ( ) ) {
lock . release ( ) ;
MasterSlaveEntry entry = getEntry ( channelName ) ;
if ( entry = = null ) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
promise . completeExceptionally ( ex ) ;
return ;
}
promise . completeExceptionally ( new RedisTimeoutException (
"Unable to acquire connection for subscription after " + attempts . get ( ) + " attempts. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ) ) ;
return ;
}
attempts . incrementAndGet ( ) ;
MasterSlaveEntry entry = getEntry ( channelName ) ;
if ( entry = = null ) {
connectionManager . newTimeout ( tt - > {
trySubscribe ( codec , channelName , promise , type , lock , attempts , listeners ) ;
} , config . getRetryInterval ( ) , TimeUnit . MILLISECONDS ) ;
return ;
}
subscribeNoTimeout ( codec , channelName , entry , promise , type , lock , attempts , listeners ) ;
}
private void subscribeNoTimeout ( Codec codec , ChannelName channelName , MasterSlaveEntry entry ,
CompletableFuture < PubSubConnectionEntry > promise , PubSubType type ,
AsyncSemaphore lock , AtomicInteger attempts , RedisPubSubListener < ? > . . . listeners ) {
@ -285,26 +316,19 @@ public class PublishSubscribeService {
return ;
}
MasterSlaveEntry msEntry = Optional . ofNullable ( connectionManager . getEntry ( entry . getClient ( ) ) ) . orElse ( entry ) ;
PubSubEntry freePubSubConnections = entry2PubSubConnection . getOrDefault ( msEntry , new PubSubEntry ( ) ) ;
PubSubEntry freePubSubConnections = entry2PubSubConnection . getOrDefault ( entry , new PubSubEntry ( ) ) ;
PubSubConnectionEntry freeEntry = freePubSubConnections . getEntries ( ) . peek ( ) ;
if ( freeEntry = = null ) {
freePubSubLock . release ( ) ;
CompletableFuture < RedisPubSubConnection > connectFuture = connect ( codec , channelName , msE ntry, promise , type , lock , listeners ) ;
CompletableFuture < RedisPubSubConnection > connectFuture = connect ( codec , channelName , e ntry, promise , type , lock , listeners ) ;
connectionManager . newTimeout ( t - > {
if ( attempts . get ( ) = = config . getRetryAttempts ( ) ) {
connectFuture . completeExceptionally ( new RedisTimeoutException (
"Unable to acquire connection for subscription after " + attempts . get ( ) + " attempts. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ) ) ;
if ( ! connectFuture . cancel ( false ) ) {
return ;
}
if ( connectFuture . cancel ( true ) ) {
subscribe ( codec , channelName , entry , promise , type , lock , attempts , listeners ) ;
attempts . incrementAndGet ( ) ;
}
trySubscribe ( codec , channelName , promise , type , lock , attempts , listeners ) ;
} , config . getRetryInterval ( ) , TimeUnit . MILLISECONDS ) ;
return ;
}
@ -314,7 +338,7 @@ public class PublishSubscribeService {
throw new IllegalStateException ( ) ;
}
PubSubKey key = new PubSubKey ( channelName , msE ntry) ;
PubSubKey key = new PubSubKey ( channelName , e ntry) ;
PubSubConnectionEntry oldEntry = name2PubSubConnection . putIfAbsent ( key , freeEntry ) ;
if ( oldEntry ! = null ) {
freeEntry . release ( ) ;
@ -379,38 +403,18 @@ public class PublishSubscribeService {
return subscribeFuture ;
}
private CompletableFuture < RedisPubSubConnection > nextPubSubConnection ( MasterSlaveEntry entry , ChannelName channelName ) {
if ( entry = = null ) {
int slot = connectionManager . calcSlot ( channelName . getName ( ) ) ;
RedisNodeNotFoundException ex = new RedisNodeNotFoundException ( "Node for slot: " + slot + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
CompletableFuture < RedisPubSubConnection > result = new CompletableFuture < > ( ) ;
result . completeExceptionally ( ex ) ;
return result ;
}
return entry . nextPubSubConnection ( ) ;
}
private CompletableFuture < RedisPubSubConnection > connect ( Codec codec , ChannelName channelName ,
MasterSlaveEntry msEntry , CompletableFuture < PubSubConnectionEntry > promise ,
PubSubType type , AsyncSemaphore lock , RedisPubSubListener < ? > . . . listeners ) {
CompletableFuture < RedisPubSubConnection > connFuture = nextPubSubConnection( msEntry , channelName ) ;
CompletableFuture < RedisPubSubConnection > connFuture = msEntry . nextPubSubConnection ( ) ;
promise . whenComplete ( ( res , e ) - > {
if ( e ! = null ) {
connFuture . completeExceptionally ( e ) ;
}
} ) ;
connFuture . whenComplete ( ( conn , ex ) - > {
if ( ex ! = null ) {
// freePubSubLock.release();
lock . release ( ) ;
if ( ! connFuture . isCancelled ( ) ) {
promise . completeExceptionally ( ex ) ;
}
return ;
}
connFuture . thenAccept ( conn - > {
freePubSubLock . acquire ( ) . thenAccept ( c - > {
PubSubConnectionEntry entry = new PubSubConnectionEntry ( conn , connectionManager ) ;
int remainFreeAmount = entry . tryAcquire ( ) ;