@ -149,7 +149,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
boolean interrupted = false ;
boolean interrupted = false ;
while ( ! future . isDone ( ) ) {
while ( ! future . isDone ( ) ) {
try {
try {
l . await ( ) ;
MasterSlaveServersConfig config = connectionManager . getConfig ( ) ;
int timeout = config . getTimeout ( ) + config . getRetryInterval ( ) * config . getRetryAttempts ( ) ;
if ( ! l . await ( timeout , TimeUnit . MILLISECONDS ) ) {
( ( RPromise < V > ) future ) . tryFailure ( new RedisTimeoutException ( "Command timeout: (" + timeout + " ms)" ) ) ;
break ;
}
} catch ( InterruptedException e ) {
} catch ( InterruptedException e ) {
interrupted = true ;
interrupted = true ;
}
}
@ -531,7 +536,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if ( details . getAttempt ( ) = = connectionManager . getConfig ( ) . getRetryAttempts ( ) ) {
if ( details . getAttempt ( ) = = connectionManager . getConfig ( ) . getRetryAttempts ( ) ) {
if ( details . getWriteFuture ( ) ! = null & & details . getWriteFuture ( ) . cancel ( false ) ) {
if ( details . getWriteFuture ( ) ! = null & & details . getWriteFuture ( ) . cancel ( false ) ) {
if ( details . getException ( ) = = null ) {
if ( details . getException ( ) = = null ) {
details . setException ( new RedisTimeoutException ( "Unable to send command: " + command + " with params: " + LogHelper . toString ( details . getParams ( ) ) + " after " + connectionManager . getConfig ( ) . getRetryAttempts ( ) + " retry attempts" ) ) ;
details . setException ( new RedisTimeoutException ( "Unable to send command! "
+ "Node source: " + source + ", connection: " + details . getConnectionFuture ( ) . getNow ( ) . getChannel ( )
+ ", command: " + command + ", command params: " + LogHelper . toString ( details . getParams ( ) )
+ " after " + connectionManager . getConfig ( ) . getRetryAttempts ( ) + " retry attempts" ) ) ;
}
}
details . getAttemptPromise ( ) . tryFailure ( details . getException ( ) ) ;
details . getAttemptPromise ( ) . tryFailure ( details . getException ( ) ) ;
}
}
@ -559,7 +567,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if ( details . getAttempt ( ) = = connectionManager . getConfig ( ) . getRetryAttempts ( ) ) {
if ( details . getAttempt ( ) = = connectionManager . getConfig ( ) . getRetryAttempts ( ) ) {
if ( details . getException ( ) = = null ) {
if ( details . getException ( ) = = null ) {
details . setException ( new RedisTimeoutException ( "Unable to send command: " + command + " with params: " + LogHelper . toString ( details . getParams ( ) ) + " after " + connectionManager . getConfig ( ) . getRetryAttempts ( ) + " retry attempts" ) ) ;
details . setException ( new RedisTimeoutException ( "Unable to send command! Node source: " + source
+ ", command: " + command + ", command params: " + LogHelper . toString ( details . getParams ( ) )
+ " after " + connectionManager . getConfig ( ) . getRetryAttempts ( ) + " retry attempts" ) ) ;
}
}
details . getAttemptPromise ( ) . tryFailure ( details . getException ( ) ) ;
details . getAttemptPromise ( ) . tryFailure ( details . getException ( ) ) ;
return ;
return ;