@ -71,6 +71,9 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.FutureListener ;
import java.util.AbstractMap ;
import java.util.LinkedHashMap ;
import java.util.LinkedHashSet ;
/ * *
*
@ -111,17 +114,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
return this ;
}
@Override
public boolean isRedissonReferenceSupportEnabled ( ) {
return redisson ! = null | | redissonReactive ! = null ;
}
@Override
public void syncSubscription ( RFuture < ? > future ) {
MasterSlaveServersConfig config = connectionManager . getConfig ( ) ;
try {
int timeout = config . getTimeout ( ) + config . getRetryInterval ( ) * config . getRetryAttempts ( ) ;
int timeout = config . getTimeout ( ) + config . getRetryInterval ( ) * config . getRetryAttempts ( ) ;
if ( ! future . await ( timeout ) ) {
throw new RedisTimeoutException ( "Subscribe timeout: (" + timeout + "ms)" ) ;
}
@ -130,7 +133,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
future . syncUninterruptibly ( ) ;
}
@Override
public < V > V get ( RFuture < V > future ) {
if ( ! future . isDone ( ) ) {
@ -141,7 +144,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
l . countDown ( ) ;
}
} ) ;
boolean interrupted = false ;
while ( ! future . isDone ( ) ) {
try {
@ -150,7 +153,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
interrupted = true ;
}
}
if ( interrupted ) {
Thread . currentThread ( ) . interrupt ( ) ;
}
@ -176,16 +179,16 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} ) ;
return l . await ( timeout , timeoutUnit ) ;
}
@Override
public < T , R > RFuture < R > readAsync ( InetSocketAddress client , MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readAsync ( InetSocketAddress client , MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
async ( true , new NodeSource ( entry , client ) , codec , command , params , mainPromise , 0 ) ;
return mainPromise ;
}
@Override
public < T , R > RFuture < R > readAsync ( InetSocketAddress client , String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readAsync ( InetSocketAddress client , String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
int slot = connectionManager . calcSlot ( key ) ;
async ( true , new NodeSource ( slot , client ) , codec , command , params , mainPromise , 0 ) ;
@ -193,7 +196,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public < T , R > RFuture < Collection < R > > readAllAsync ( RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < Collection < R > > readAllAsync ( RedisCommand < T > command , Object . . . params ) {
final RPromise < Collection < R > > mainPromise = connectionManager . newPromise ( ) ;
final Set < MasterSlaveEntry > nodes = connectionManager . getEntrySet ( ) ;
final List < R > results = new ArrayList < R > ( ) ;
@ -205,18 +208,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise . tryFailure ( future . cause ( ) ) ;
return ;
}
R result = future . getNow ( ) ;
if ( result instanceof Collection ) {
synchronized ( results ) {
results . addAll ( ( Collection ) result ) ;
results . addAll ( ( Collection ) result ) ;
}
} else {
synchronized ( results ) {
results . add ( result ) ;
}
}
if ( counter . decrementAndGet ( ) = = 0
& & ! mainPromise . isDone ( ) ) {
mainPromise . trySuccess ( results ) ;
@ -233,7 +236,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public < T , R > RFuture < R > readRandomAsync ( RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readRandomAsync ( RedisCommand < T > command , Object . . . params ) {
final RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
final List < MasterSlaveEntry > nodes = new ArrayList < MasterSlaveEntry > ( connectionManager . getEntrySet ( ) ) ;
Collections . shuffle ( nodes ) ;
@ -269,21 +272,21 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public < T > RFuture < Void > writeAllAsync ( RedisCommand < T > command , Object . . . params ) {
public < T > RFuture < Void > writeAllAsync ( RedisCommand < T > command , Object . . . params ) {
return writeAllAsync ( command , null , params ) ;
}
@Override
public < R , T > RFuture < R > writeAllAsync ( RedisCommand < T > command , SlotCallback < T , R > callback , Object . . . params ) {
public < R , T > RFuture < R > writeAllAsync ( RedisCommand < T > command , SlotCallback < T , R > callback , Object . . . params ) {
return allAsync ( false , command , callback , params ) ;
}
@Override
public < R , T > RFuture < R > readAllAsync ( RedisCommand < T > command , SlotCallback < T , R > callback , Object . . . params ) {
public < R , T > RFuture < R > readAllAsync ( RedisCommand < T > command , SlotCallback < T , R > callback , Object . . . params ) {
return allAsync ( true , command , callback , params ) ;
}
private < T , R > RFuture < R > allAsync ( boolean readOnlyMode , RedisCommand < T > command , final SlotCallback < T , R > callback , Object . . . params ) {
private < T , R > RFuture < R > allAsync ( boolean readOnlyMode , RedisCommand < T > command , final SlotCallback < T , R > callback , Object . . . params ) {
final RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
final Set < MasterSlaveEntry > nodes = connectionManager . getEntrySet ( ) ;
final AtomicInteger counter = new AtomicInteger ( nodes . size ( ) ) ;
@ -317,9 +320,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
public < V > RedisException convertException ( RFuture < V > future ) {
return future . cause ( ) instanceof RedisException ?
(RedisException ) future . cause ( ) :
new RedisException ( "Unexpected exception while processing command" , future . cause ( ) ) ;
return future . cause ( ) instanceof RedisException
? (RedisException ) future . cause ( )
: new RedisException ( "Unexpected exception while processing command" , future . cause ( ) ) ;
}
private NodeSource getNodeSource ( String key ) {
@ -329,88 +332,86 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public < T , R > RFuture < R > readAsync ( String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readAsync ( String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
NodeSource source = getNodeSource ( key ) ;
async ( true , source , codec , command , params , mainPromise , 0 ) ;
return mainPromise ;
}
public < T , R > RFuture < R > readAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
async ( true , new NodeSource ( entry ) , codec , command , params , mainPromise , 0 ) ;
return mainPromise ;
}
public < T , R > RFuture < R > readAsync ( Integer slot , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readAsync ( Integer slot , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
async ( true , new NodeSource ( slot ) , codec , command , params , mainPromise , 0 ) ;
return mainPromise ;
}
@Override
public < T , R > RFuture < R > writeAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > writeAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
async ( false , new NodeSource ( entry ) , codec , command , params , mainPromise , 0 ) ;
return mainPromise ;
}
@Override
public < T , R > RFuture < R > writeAsync ( Integer slot , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > writeAsync ( Integer slot , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
async ( false , new NodeSource ( slot ) , codec , command , params , mainPromise , 0 ) ;
return mainPromise ;
}
@Override
public < T , R > RFuture < R > readAsync ( String key , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > readAsync ( String key , RedisCommand < T > command , Object . . . params ) {
return readAsync ( key , connectionManager . getCodec ( ) , command , params ) ;
}
@Override
public < T , R > RFuture < R > evalReadAsync ( String key , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalReadAsync ( String key , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
NodeSource source = getNodeSource ( key ) ;
return evalAsync ( source , true , codec , evalCommandType , script , keys , params ) ;
}
@Override
public < T , R > RFuture < R > evalReadAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalReadAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
return evalAsync ( new NodeSource ( entry ) , true , codec , evalCommandType , script , keys , params ) ;
}
@Override
public < T , R > RFuture < R > evalReadAsync ( Integer slot , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalReadAsync ( Integer slot , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
return evalAsync ( new NodeSource ( slot ) , true , codec , evalCommandType , script , keys , params ) ;
}
@Override
public < T , R > RFuture < R > evalReadAsync ( InetSocketAddress client , String key , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalReadAsync ( InetSocketAddress client , String key , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
int slot = connectionManager . calcSlot ( key ) ;
return evalAsync ( new NodeSource ( slot , client ) , true , codec , evalCommandType , script , keys , params ) ;
}
@Override
public < T , R > RFuture < R > evalWriteAsync ( String key , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalWriteAsync ( String key , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
NodeSource source = getNodeSource ( key ) ;
return evalAsync ( source , false , codec , evalCommandType , script , keys , params ) ;
}
public < T , R > RFuture < R > evalWriteAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalWriteAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
return evalAsync ( new NodeSource ( entry ) , false , codec , evalCommandType , script , keys , params ) ;
}
public < T , R > RFuture < R > evalWriteAsync ( Integer slot , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalWriteAsync ( Integer slot , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
return evalAsync ( new NodeSource ( slot ) , false , codec , evalCommandType , script , keys , params ) ;
}
@Override
public < T , R > RFuture < R > evalWriteAllAsync ( RedisCommand < T > command , SlotCallback < T , R > callback , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalWriteAllAsync ( RedisCommand < T > command , SlotCallback < T , R > callback , String script , List < Object > keys , Object . . . params ) {
return evalAllAsync ( false , command , callback , script , keys , params ) ;
}
public < T , R > RFuture < R > evalAllAsync ( boolean readOnlyMode , RedisCommand < T > command , final SlotCallback < T , R > callback , String script , List < Object > keys , Object . . . params ) {
public < T , R > RFuture < R > evalAllAsync ( boolean readOnlyMode , RedisCommand < T > command , final SlotCallback < T , R > callback , String script , List < Object > keys , Object . . . params ) {
final RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
final Set < MasterSlaveEntry > entries = connectionManager . getEntrySet ( ) ;
final AtomicInteger counter = new AtomicInteger ( entries . size ( ) ) ;
@ -425,7 +426,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
callback . onSlotResult ( future . getNow ( ) ) ;
if ( counter . decrementAndGet ( ) = = 0
& & ! mainPromise . isDone ( ) ) {
& & ! mainPromise . isDone ( ) ) {
mainPromise . trySuccess ( callback . onFinish ( ) ) ;
}
}
@ -444,7 +445,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise ;
}
private < T , R > RFuture < R > evalAsync ( NodeSource nodeSource , boolean readOnlyMode , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
private < T , R > RFuture < R > evalAsync ( NodeSource nodeSource , boolean readOnlyMode , Codec codec , RedisCommand < T > evalCommandType , String script , List < Object > keys , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
List < Object > args = new ArrayList < Object > ( 2 + keys . size ( ) + params . length ) ;
args . add ( script ) ;
@ -456,12 +457,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public < T , R > RFuture < R > writeAsync ( String key , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > writeAsync ( String key , RedisCommand < T > command , Object . . . params ) {
return writeAsync ( key , connectionManager . getCodec ( ) , command , params ) ;
}
@Override
public < T , R > RFuture < R > writeAsync ( String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
public < T , R > RFuture < R > writeAsync ( String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = connectionManager . newPromise ( ) ;
NodeSource source = getNodeSource ( key ) ;
async ( false , source , codec , command , params , mainPromise , 0 ) ;
@ -469,7 +470,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
protected < V , R > void async ( final boolean readOnlyMode , final NodeSource source , final Codec codec ,
final RedisCommand < V > command , final Object [ ] params , final RPromise < R > mainPromise , final int attempt ) {
final RedisCommand < V > command , final Object [ ] params , final RPromise < R > mainPromise , final int attempt ) {
if ( mainPromise . isCancelled ( ) ) {
free ( params ) ;
return ;
@ -481,7 +482,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return ;
}
final AsyncDetails < V , R > details = AsyncDetails . acquire ( ) ;
if ( isRedissonReferenceSupportEnabled ( ) ) {
try {
@ -522,7 +522,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
} ;
final TimerTask retryTimerTask = new TimerTask ( ) {
@Override
@ -539,7 +539,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if ( details . getAttempt ( ) = = connectionManager . getConfig ( ) . getRetryAttempts ( ) ) {
if ( details . getWriteFuture ( ) . cancel ( false ) ) {
if ( details . getException ( ) = = null ) {
details . setException ( new RedisTimeoutException ( "Unable to send command: " + command + " with params: " + LogHelper . toString ( details . getParams ( ) + " after " + connectionManager . getConfig ( ) . getRetryAttempts ( ) + " retry attempts" ) ) ) ;
details . setException ( new RedisTimeoutException ( "Unable to send command: " + command + " with params: " + LogHelper . toString ( details . getParams ( ) ) + " after " + connectionManager . getConfig ( ) . getRetryAttempts ( ) + " retry attempts" ) ) ;
}
details . getAttemptPromise ( ) . tryFailure ( details . getException ( ) ) ;
}
@ -550,7 +550,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details . setTimeout ( timeout ) ;
return ;
}
if ( details . getWriteFuture ( ) . isDone ( ) & & details . getWriteFuture ( ) . isSuccess ( ) ) {
return ;
}
@ -591,30 +591,30 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Timeout timeout = connectionManager . newTimeout ( retryTimerTask , connectionManager . getConfig ( ) . getRetryInterval ( ) , TimeUnit . MILLISECONDS ) ;
details . setTimeout ( timeout ) ;
details . setupMainPromiseListener ( mainPromiseListener ) ;
connectionFuture . addListener ( new FutureListener < RedisConnection > ( ) {
@Override
public void operationComplete ( Future < RedisConnection > connFuture ) throws Exception {
if ( connFuture . isCancelled ( ) ) {
return ;
}
if ( ! connFuture . isSuccess ( ) ) {
connectionManager . getShutdownLatch ( ) . release ( ) ;
details . setException ( convertException ( connectionFuture ) ) ;
return ;
}
if ( details . getAttemptPromise ( ) . isDone ( ) | | details . getMainPromise ( ) . isDone ( ) ) {
releaseConnection ( source , connectionFuture , details . isReadOnlyMode ( ) , details . getAttemptPromise ( ) , details ) ;
return ;
}
final RedisConnection connection = connFuture . getNow ( ) ;
if ( details . getSource ( ) . getRedirect ( ) = = Redirect . ASK ) {
List < CommandData < ? , ? > > list = new ArrayList < CommandData < ? , ? > > ( 2 ) ;
RPromise < Void > promise = connectionManager . newPromise ( ) ;
list . add ( new CommandData < Void , Void > ( promise , details . getCodec ( ) , RedisCommands . ASKING , new Object [ ] { } ) ) ;
list . add ( new CommandData < Void , Void > ( promise , details . getCodec ( ) , RedisCommands . ASKING , new Object [ ] { } ) ) ;
list . add ( new CommandData < V , R > ( details . getAttemptPromise ( ) , details . getCodec ( ) , details . getCommand ( ) , details . getParams ( ) ) ) ;
RPromise < Void > main = connectionManager . newPromise ( ) ;
ChannelFuture future = connection . send ( new CommandsData ( main , list ) ) ;
@ -627,7 +627,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
ChannelFuture future = connection . send ( new CommandData < V , R > ( details . getAttemptPromise ( ) , details . getCodec ( ) , details . getCommand ( ) , details . getParams ( ) ) ) ;
details . setWriteFuture ( future ) ;
}
details . getWriteFuture ( ) . addListener ( new ChannelFutureListener ( ) {
@Override
public void operationComplete ( ChannelFuture future ) throws Exception {
@ -658,26 +658,25 @@ public class CommandAsyncService implements CommandAsyncExecutor {
ReferenceCountUtil . safeRelease ( obj ) ;
}
}
private < V , R > void checkWriteFuture ( final AsyncDetails < V , R > details , final RedisConnection connection ) {
ChannelFuture future = details . getWriteFuture ( ) ;
if ( details. getAttemptPromise ( ) . isDone ( ) ) {
if ( future. isCancelled ( ) | | details. getAttemptPromise ( ) . isDone ( ) ) {
return ;
}
if ( ! future . isSuccess ( ) ) {
log . trace ( "Can't write {} to {}" , details . getCommand ( ) , connection ) ;
details . setException ( new WriteRedisConnectionException (
"Can't write command: " + details . getCommand ( ) + ", params: " + LogHelper . toString ( details . getParams ( ) ) + " to channel: " + future . channel ( ) , future . cause ( ) ) ) ;
if ( details . getAttempt ( ) = = connectionManager . getConfig ( ) . getRetryAttempts ( ) ) {
details . getAttemptPromise ( ) . tryFailure ( details . getException ( ) ) ;
free ( details ) ;
}
return ;
}
details . getTimeout ( ) . cancel ( ) ;
long timeoutTime = connectionManager . getConfig ( ) . getTimeout ( ) ;
if ( RedisCommands . BLOCKING_COMMANDS . contains ( details . getCommand ( ) . getName ( ) ) ) {
Long popTimeout = Long . valueOf ( details . getParams ( ) [ details . getParams ( ) . length - 1 ] . toString ( ) ) ;
@ -685,7 +684,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if ( popTimeout = = 0 ) {
return ;
}
timeoutTime + = popTimeout * 1000 ;
timeoutTime + = popTimeout * 1000 ;
// add 1 second due to issue https://github.com/antirez/redis/issues/874
timeoutTime + = 1000 ;
}
@ -711,7 +710,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details . getMainPromise ( ) . tryFailure ( new RedissonShutdownException ( "Redisson is shutdown" ) ) ;
}
} ;
final Timeout scheduledFuture ;
if ( popTimeout ! = 0 ) {
// to handle cases when connection has been lost
@ -721,11 +720,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public void run ( Timeout timeout ) throws Exception {
// re-connection hasn't been made
// and connection is still active
if ( orignalChannel = = connection . getChannel ( )
if ( orignalChannel = = connection . getChannel ( )
& & connection . isActive ( ) ) {
return ;
}
if ( details . getAttemptPromise ( ) . trySuccess ( null ) ) {
connection . forceFastReconnectAsync ( ) ;
}
@ -734,7 +733,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else {
scheduledFuture = null ;
}
details . getMainPromise ( ) . addListener ( new FutureListener < R > ( ) {
@Override
public void operationComplete ( Future < R > future ) throws Exception {
@ -757,13 +756,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} ) ;
return ;
}
if ( future . cause ( ) instanceof RedissonShutdownException ) {
details . getAttemptPromise ( ) . tryFailure ( future . cause ( ) ) ;
}
}
} ) ;
synchronized ( listener ) {
if ( ! details . getMainPromise ( ) . isDone ( ) ) {
connectionManager . getShutdownPromise ( ) . addListener ( listener ) ;
@ -772,14 +771,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
protected < V , R > void releaseConnection ( final NodeSource source , final RFuture < RedisConnection > connectionFuture ,
final boolean isReadOnly , RPromise < R > attemptPromise , final AsyncDetails < V , R > details ) {
final boolean isReadOnly , RPromise < R > attemptPromise , final AsyncDetails < V , R > details ) {
attemptPromise . addListener ( new FutureListener < R > ( ) {
@Override
public void operationComplete ( Future < R > future ) throws Exception {
if ( ! connectionFuture . isSuccess ( ) ) {
return ;
}
RedisConnection connection = connectionFuture . getNow ( ) ;
connectionManager . getShutdownLatch ( ) . release ( ) ;
if ( isReadOnly ) {
@ -787,7 +786,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else {
connectionManager . releaseWrite ( source , connection ) ;
}
if ( log . isDebugEnabled ( ) ) {
log . debug ( "connection released for command {} and params {} from slot {} using connection {}" ,
details . getCommand ( ) , Arrays . toString ( details . getParams ( ) ) , details . getSource ( ) , connection ) ;
@ -806,7 +805,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details . removeMainPromiseListener ( ) ;
if ( future . cause ( ) instanceof RedisMovedException ) {
RedisMovedException ex = ( RedisMovedException ) future . cause ( ) ;
RedisMovedException ex = ( RedisMovedException ) future . cause ( ) ;
async ( details . isReadOnlyMode ( ) , new NodeSource ( ex . getSlot ( ) , ex . getAddr ( ) , Redirect . MOVED ) , details . getCodec ( ) ,
details . getCommand ( ) , details . getParams ( ) , details . getMainPromise ( ) , details . getAttempt ( ) ) ;
AsyncDetails . release ( details ) ;
@ -814,7 +813,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
if ( future . cause ( ) instanceof RedisAskException ) {
RedisAskException ex = ( RedisAskException ) future . cause ( ) ;
RedisAskException ex = ( RedisAskException ) future . cause ( ) ;
async ( details . isReadOnlyMode ( ) , new NodeSource ( ex . getSlot ( ) , ex . getAddr ( ) , Redirect . ASK ) , details . getCodec ( ) ,
details . getCommand ( ) , details . getParams ( ) , details . getMainPromise ( ) , details . getAttempt ( ) ) ;
AsyncDetails . release ( details ) ;
@ -827,14 +826,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
AsyncDetails . release ( details ) ;
return ;
}
if ( future . cause ( ) instanceof RedisTryAgainException ) {
connectionManager . newTimeout ( new TimerTask ( ) {
@Override
public void run ( Timeout timeout ) throws Exception {
async ( details . isReadOnlyMode ( ) , source , details . getCodec ( ) ,
details . getCommand ( ) , details . getParams ( ) , details . getMainPromise ( ) , details . getAttempt ( ) ) ;
}
} , 1 , TimeUnit . SECONDS ) ;
AsyncDetails . release ( details ) ;
@ -842,7 +841,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
free ( details ) ;
if ( future . isSuccess ( ) ) {
R res = future . getNow ( ) ;
if ( res instanceof RedisClientResult ) {
@ -850,9 +849,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if ( addr = = null ) {
addr = details . getConnectionFuture ( ) . getNow ( ) . getRedisClient ( ) . getAddr ( ) ;
}
( ( RedisClientResult ) res ) . setRedisClient ( addr ) ;
( ( RedisClientResult ) res ) . setRedisClient ( addr ) ;
}
if ( isRedissonReferenceSupportEnabled ( ) ) {
handleReference ( details . getMainPromise ( ) , res ) ;
} else {
@ -867,29 +866,60 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private < R , V > void handleReference ( RPromise < R > mainPromise , R res ) {
if ( res instanceof List ) {
List < Object > r = ( List < Object > ) res ;
List < Object > r = ( List < Object > ) res ;
for ( int i = 0 ; i < r . size ( ) ; i + + ) {
if ( r . get ( i ) instanceof RedissonReference ) {
try {
r . set ( i , redisson ! = null
? RedissonObjectFactory . fromReference ( redisson , ( RedissonReference ) r . get ( i ) )
: RedissonObjectFactory . fromReference ( redissonReactive , ( RedissonReference ) r . get ( i ) ) ) ;
} catch ( Exception exception ) { //skip and carry on to next one.
}
r . set ( i , fromReference ( r . get ( i ) ) ) ;
} else if ( r . get ( i ) instanceof ScoredEntry & & ( ( ScoredEntry ) r . get ( i ) ) . getValue ( ) instanceof RedissonReference ) {
try {
ScoredEntry < ? > se = ( ( ScoredEntry < ? > ) r . get ( i ) ) ;
se = new ScoredEntry ( se . getScore ( ) , redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) se . getValue ( ) )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) se . getValue ( ) ) ) ;
r . set ( i , se ) ;
} catch ( Exception exception ) { //skip and carry on to next one.
}
ScoredEntry < ? > se = ( ( ScoredEntry < ? > ) r . get ( i ) ) ;
se = new ScoredEntry ( se . getScore ( ) , fromReference ( se . getValue ( ) ) ) ;
r . set ( i , se ) ;
}
}
mainPromise . trySuccess ( res ) ;
} else if ( res instanceof Set ) {
Set r = ( Set ) res ;
LinkedHashSet converted = new LinkedHashSet ( ) ;
for ( Object o : r ) {
if ( o instanceof RedissonReference ) {
converted . add ( fromReference ( o ) ) ;
} else if ( o instanceof ScoredEntry & & ( ( ScoredEntry ) o ) . getValue ( ) instanceof RedissonReference ) {
ScoredEntry < ? > se = ( ( ScoredEntry < ? > ) o ) ;
se = new ScoredEntry ( se . getScore ( ) , fromReference ( se . getValue ( ) ) ) ;
converted . add ( se ) ;
} else if ( o instanceof Map . Entry ) {
Map . Entry old = ( Map . Entry ) o ;
Object key = old . getKey ( ) ;
if ( key instanceof RedissonReference ) {
key = fromReference ( key ) ;
}
Object value = old . getValue ( ) ;
if ( value instanceof RedissonReference ) {
value = fromReference ( value ) ;
}
converted . add ( new AbstractMap . SimpleEntry ( key , value ) ) ;
} else {
converted . add ( o ) ;
}
}
mainPromise . trySuccess ( ( R ) converted ) ;
} else if ( res instanceof Map ) {
Map < Object , Object > map = ( Map < Object , Object > ) res ;
LinkedHashMap < Object , Object > converted = new LinkedHashMap < Object , Object > ( ) ;
for ( Map . Entry < Object , Object > e : map . entrySet ( ) ) {
Object value = e . getValue ( ) ;
if ( e . getValue ( ) instanceof RedissonReference ) {
value = fromReference ( e . getValue ( ) ) ;
}
Object key = e . getKey ( ) ;
if ( e . getKey ( ) instanceof RedissonReference ) {
key = fromReference ( e . getKey ( ) ) ;
}
converted . put ( key , value ) ;
}
mainPromise . trySuccess ( ( R ) converted ) ;
} else if ( res instanceof ListScanResult ) {
List < ScanObjectEntry > r = ( ( ListScanResult ) res ) . getValues ( ) ;
List < ScanObjectEntry > r = ( ( ListScanResult ) res ) . getValues ( ) ;
for ( int i = 0 ; i < r . size ( ) ; i + + ) {
Object obj = r . get ( i ) ;
if ( ! ( obj instanceof ScanObjectEntry ) ) {
@ -897,60 +927,42 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
ScanObjectEntry e = r . get ( i ) ;
if ( e . getObj ( ) instanceof RedissonReference ) {
try {
r . set ( i , new ScanObjectEntry ( e . getBuf ( ) , redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) e . getObj ( ) )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) e . getObj ( ) ) ) ) ;
} catch ( Exception exception ) { //skip and carry on to next one.
}
r . set ( i , new ScanObjectEntry ( e . getBuf ( ) , fromReference ( e . getObj ( ) ) ) ) ;
} else if ( e . getObj ( ) instanceof ScoredEntry & & ( ( ScoredEntry < ? > ) e . getObj ( ) ) . getValue ( ) instanceof RedissonReference ) {
try {
ScoredEntry < ? > se = ( ( ScoredEntry < ? > ) e . getObj ( ) ) ;
se = new ScoredEntry ( se . getScore ( ) , redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) se . getValue ( ) )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) se . getValue ( ) ) ) ;
r . set ( i , new ScanObjectEntry ( e . getBuf ( ) , se ) ) ;
} catch ( Exception exception ) { //skip and carry on to next one.
}
ScoredEntry < ? > se = ( ( ScoredEntry < ? > ) e . getObj ( ) ) ;
se = new ScoredEntry ( se . getScore ( ) , fromReference ( se . getValue ( ) ) ) ;
r . set ( i , new ScanObjectEntry ( e . getBuf ( ) , se ) ) ;
}
}
mainPromise . trySuccess ( res ) ;
} else if ( res instanceof MapScanResult ) {
Map < ScanObjectEntry , ScanObjectEntry > map = ( ( MapScanResult ) res ) . getMap ( ) ;
HashMap < ScanObjectEntry , ScanObjectEntry > toAdd = null ;
for ( Map . Entry < ScanObjectEntry , ScanObjectEntry > e : map . entrySet ( ) ) {
MapScanResult scanResult = ( MapScanResult ) res ;
Map < ScanObjectEntry , ScanObjectEntry > map = ( ( MapScanResult ) res ) . getMap ( ) ;
LinkedHashMap < ScanObjectEntry , ScanObjectEntry > converted = new LinkedHashMap < ScanObjectEntry , ScanObjectEntry > ( ) ;
boolean hasConversion = false ;
for ( Map . Entry < ScanObjectEntry , ScanObjectEntry > e : map . entrySet ( ) ) {
ScanObjectEntry value = e . getValue ( ) ;
if ( e . getValue ( ) . getObj ( ) instanceof RedissonReference ) {
try {
e . setValue ( new ScanObjectEntry ( e . getValue ( ) . getBuf ( ) , redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) e . getValue ( ) . getObj ( ) )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) e . getValue ( ) . getObj ( ) ) ) ) ;
} catch ( Exception exception ) { //skip and carry on to next one.
}
value = new ScanObjectEntry ( e . getValue ( ) . getBuf ( ) , fromReference ( e . getValue ( ) . getObj ( ) ) ) ;
hasConversion = true ;
}
ScanObjectEntry key = e . getKey ( ) ;
if ( e . getKey ( ) . getObj ( ) instanceof RedissonReference ) {
if ( toAdd = = null ) {
toAdd = new HashMap < ScanObjectEntry , ScanObjectEntry > ( ) ;
}
toAdd . put ( e . getKey ( ) , e . getValue ( ) ) ;
key = new ScanObjectEntry ( e . getKey ( ) . getBuf ( ) , fromReference ( e . getKey ( ) . getObj ( ) ) ) ;
hasConversion = true ;
}
converted . put ( key , value ) ;
}
if ( toAdd ! = null ) {
for ( Map . Entry < ScanObjectEntry , ScanObjectEntry > e : toAdd . entrySet ( ) ) {
try {
map . put ( new ScanObjectEntry ( e . getValue ( ) . getBuf ( ) , ( redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) e . getKey ( ) . getObj ( ) )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) e . getKey ( ) . getObj ( ) ) ) ) , map . remove ( e . getKey ( ) ) ) ;
} catch ( Exception exception ) { //skip and carry on to next one.
}
}
if ( hasConversion ) {
MapScanResult < ScanObjectEntry , ScanObjectEntry > newScanResult = new MapScanResult < ScanObjectEntry , ScanObjectEntry > ( scanResult . getPos ( ) , converted ) ;
newScanResult . setRedisClient ( scanResult . getRedisClient ( ) ) ;
mainPromise . trySuccess ( ( R ) newScanResult ) ;
} else {
mainPromise . trySuccess ( ( R ) res ) ;
}
mainPromise . trySuccess ( res ) ;
} else if ( res instanceof RedissonReference ) {
try {
mainPromise . trySuccess ( redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) res )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) res ) ) ;
mainPromise . trySuccess ( this . < R > fromReference ( res ) ) ;
} catch ( Exception exception ) {
mainPromise . trySuccess ( res ) ; //fallback
}
@ -959,4 +971,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
private < R > R fromReference ( Object res ) {
try {
return redisson ! = null
? RedissonObjectFactory . < R > fromReference ( redisson , ( RedissonReference ) res )
: RedissonObjectFactory . < R > fromReference ( redissonReactive , ( RedissonReference ) res ) ;
} catch ( Exception exception ) {
return ( R ) res ;
}
}
}