@ -36,6 +36,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry ;
import org.redisson.connection.NodeSource ;
import org.redisson.liveobject.core.RedissonObjectBuilder ;
import org.redisson.misc.CompletableFutureWrapper ;
import org.redisson.misc.RPromise ;
import org.redisson.misc.RedissonPromise ;
import org.slf4j.Logger ;
@ -81,11 +82,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public void syncSubscription ( R Future< ? > future ) {
public void syncSubscription ( Completable Future< ? > future ) {
MasterSlaveServersConfig config = connectionManager . getConfig ( ) ;
int timeout = config . getTimeout ( ) + config . getRetryInterval ( ) * config . getRetryAttempts ( ) ;
try {
future . toCompletableFuture( ) . get( timeout , TimeUnit . MILLISECONDS ) ;
future . get( timeout , TimeUnit . MILLISECONDS ) ;
} catch ( InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ;
} catch ( CancellationException e ) {
@ -93,25 +94,46 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} catch ( ExecutionException e ) {
throw ( RuntimeException ) e . getCause ( ) ;
} catch ( TimeoutException e ) {
( ( RPromise < ? > ) future ) . tryFailure ( new RedisTimeoutException ( "Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ) ) ;
future . completeExceptionally ( new RedisTimeoutException ( "Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ) ) ;
}
}
@Override
public void syncSubscriptionInterrupted ( R Future< ? > future ) throws InterruptedException {
public void syncSubscriptionInterrupted ( Completable Future< ? > future ) throws InterruptedException {
MasterSlaveServersConfig config = connectionManager . getConfig ( ) ;
int timeout = config . getTimeout ( ) + config . getRetryInterval ( ) * config . getRetryAttempts ( ) ;
try {
future . toCompletableFuture( ) . get( timeout , TimeUnit . MILLISECONDS ) ;
future . get( timeout , TimeUnit . MILLISECONDS ) ;
} catch ( CancellationException e ) {
// skip
} catch ( ExecutionException e ) {
throw ( RuntimeException ) e . getCause ( ) ;
} catch ( TimeoutException e ) {
( ( RPromise < ? > ) future ) . tryFailure ( new RedisTimeoutException ( "Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ) ) ;
future . completeExceptionally ( new RedisTimeoutException ( "Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ) ) ;
}
}
@Override
public < V > V getNow ( CompletableFuture < V > future ) {
try {
return future . getNow ( null ) ;
} catch ( Exception e ) {
return null ;
}
}
@Override
public < V > void transfer ( CompletableFuture < V > future1 , CompletableFuture < V > future2 ) {
future1 . whenComplete ( ( res , e ) - > {
if ( e ! = null ) {
future2 . completeExceptionally ( e ) ;
return ;
}
future2 . complete ( res ) ;
} ) ;
}
@Override
public < V > V get ( RFuture < V > future ) {
if ( Thread . currentThread ( ) . getName ( ) . startsWith ( "redisson-netty" ) ) {
@ -134,44 +156,36 @@ public class CommandAsyncService implements CommandAsyncExecutor {
try {
return future . toCompletableFuture ( ) . get ( ) ;
} catch ( InterruptedException e ) {
( ( RPromise ) future ) . tryFailure ( e ) ;
future . toCompletableFuture ( ) . completeExceptionally ( e ) ;
throw e ;
} catch ( ExecutionException e ) {
throw convertException ( e ) ;
}
}
protected < R > RPromis e< R > createPromise ( ) {
return new RedissonPromis e< R > ( ) ;
protected < R > CompletableFutur e< R > createPromise ( ) {
return new CompletableFutur e< R > ( ) ;
}
@Override
public < T , R > RFuture < R > readAsync ( RedisClient client , MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
async ( true , new NodeSource ( entry , client ) , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , new NodeSource ( entry , client ) , codec , command , params , false , false ) ;
}
@Override
public < T , R > RFuture < R > readAsync ( RedisClient client , String name , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
int slot = connectionManager . calcSlot ( name ) ;
async ( true , new NodeSource ( slot , client ) , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , new NodeSource ( slot , client ) , codec , command , params , false , false ) ;
}
public < T , R > RFuture < R > readAsync ( RedisClient client , byte [ ] key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
int slot = connectionManager . calcSlot ( key ) ;
async ( true , new NodeSource ( slot , client ) , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , new NodeSource ( slot , client ) , codec , command , params , false , false ) ;
}
@Override
public < T , R > RFuture < R > readAsync ( RedisClient client , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
async ( true , new NodeSource ( client ) , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , new NodeSource ( client ) , codec , command , params , false , false ) ;
}
@Override
@ -188,14 +202,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public < T , R > RFuture < Collection < R > > readAllAsync ( Collection < R > results , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromis e< Collection < R > > mainPromise = createPromise ( ) ;
CompletableFutur e< Collection < R > > mainPromise = createPromise ( ) ;
Collection < MasterSlaveEntry > nodes = connectionManager . getEntrySet ( ) ;
AtomicInteger counter = new AtomicInteger ( nodes . size ( ) ) ;
BiConsumer < Object , Throwable > listener = new BiConsumer < Object , Throwable > ( ) {
@Override
public void accept ( Object result , Throwable u ) {
if ( u ! = null & & ! ( u instanceof RedisRedirectException ) ) {
mainPromise . tryFailure ( u ) ;
mainPromise . completeExceptionally ( u ) ;
return ;
}
@ -211,57 +225,54 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if ( counter . decrementAndGet ( ) = = 0
& & ! mainPromise . isDone ( ) ) {
mainPromise . trySuccess ( results ) ;
mainPromise . complete ( results ) ;
}
}
} ;
for ( MasterSlaveEntry entry : nodes ) {
RPromise < R > promise = new RedissonPromise < R > ( ) ;
promise . onComplete ( listener ) ;
async ( true , new NodeSource ( entry ) , codec , command , params , promise , true , false ) ;
RFuture < Object > f = async ( true , new NodeSource ( entry ) , codec , command , params , true , false ) ;
f . whenComplete ( listener ) ;
}
return mainPromise ;
return new CompletableFutureWrapper < > ( mainPromise ) ;
}
@Override
public < T , R > RFuture < R > readRandomAsync ( Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromis e< R > mainPromise = createPromise ( ) ;
CompletableFutur e< R > mainPromise = createPromise ( ) ;
List < MasterSlaveEntry > nodes = new ArrayList < MasterSlaveEntry > ( connectionManager . getEntrySet ( ) ) ;
Collections . shuffle ( nodes ) ;
retryReadRandomAsync ( codec , command , mainPromise , nodes , params ) ;
return mainPromise ;
return new CompletableFutureWrapper < > ( mainPromise ) ;
}
@Override
public < T , R > RFuture < R > readRandomAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromis e< R > mainPromise = createPromise ( ) ;
CompletableFutur e< R > mainPromise = createPromise ( ) ;
retryReadRandomAsync ( codec , command , mainPromise , Collections . singletonList ( entry ) , params ) ;
return mainPromise ;
return new CompletableFutureWrapper < > ( mainPromise ) ;
}
private < R , T > void retryReadRandomAsync ( Codec codec , RedisCommand < T > command , RPromis e< R > mainPromise ,
private < R , T > void retryReadRandomAsync ( Codec codec , RedisCommand < T > command , CompletableFutur e< R > mainPromise ,
List < MasterSlaveEntry > nodes , Object . . . params ) {
RPromise < R > attemptPromise = new RedissonPromise < R > ( ) ;
attemptPromise . onComplete ( ( res , e ) - > {
MasterSlaveEntry entry = nodes . remove ( 0 ) ;
RFuture < R > attemptPromise = async ( true , new NodeSource ( entry ) , codec , command , params , false , false ) ;
attemptPromise . whenComplete ( ( res , e ) - > {
if ( e = = null ) {
if ( res = = null ) {
if ( nodes . isEmpty ( ) ) {
mainPromise . trySuccess ( null ) ;
mainPromise . complete ( null ) ;
} else {
retryReadRandomAsync ( codec , command , mainPromise , nodes , params ) ;
}
} else {
mainPromise . trySuccess ( res ) ;
mainPromise . complete ( res ) ;
}
} else {
mainPromise . tryFailure ( e ) ;
mainPromise . completeExceptionally ( e ) ;
}
} ) ;
MasterSlaveEntry entry = nodes . remove ( 0 ) ;
async ( true , new NodeSource ( entry ) , codec , command , params , attemptPromise , false , false ) ;
}
@Override
@ -314,9 +325,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} ;
for ( MasterSlaveEntry entry : nodes ) {
RPromise < T > promise = new RedissonPromise < T > ( ) ;
promise . onComplete ( listener ) ;
async ( readOnlyMode , new NodeSource ( entry ) , codec , command , params , promise , true , false ) ;
RFuture < T > promise = async ( readOnlyMode , new NodeSource ( entry ) , codec , command , params , true , false ) ;
promise . whenComplete ( listener ) ;
}
return mainPromise ;
}
@ -340,24 +350,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public < T , R > RFuture < R > readAsync ( String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
NodeSource source = getNodeSource ( key ) ;
async ( true , source , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , source , codec , command , params , false , false ) ;
}
@Override
public < T , R > RFuture < R > readAsync ( byte [ ] key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
NodeSource source = getNodeSource ( key ) ;
async ( true , source , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , source , codec , command , params , false , false ) ;
}
public < T , R > RFuture < R > readAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
async ( true , new NodeSource ( entry ) , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( true , new NodeSource ( entry ) , codec , command , params , false , false ) ;
}
@Override
@ -368,9 +372,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public < T , R > RFuture < R > writeAsync ( MasterSlaveEntry entry , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
async ( false , new NodeSource ( entry ) , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( false , new NodeSource ( entry ) , codec , command , params , false , false ) ;
}
@Override
@ -442,9 +444,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args . addAll ( keys ) ;
args . addAll ( Arrays . asList ( params ) ) ;
for ( MasterSlaveEntry entry : entries ) {
RPromise < T > promise = new RedissonPromise < T > ( ) ;
promise . onComplete ( listener ) ;
async ( readOnlyMode , new NodeSource ( entry ) , connectionManager . getCodec ( ) , command , args . toArray ( ) , promise , true , false ) ;
RFuture < T > promise = async ( readOnlyMode , new NodeSource ( entry ) , connectionManager . getCodec ( ) , command , args . toArray ( ) , true , false ) ;
promise . whenComplete ( listener ) ;
}
return mainPromise ;
}
@ -498,11 +499,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private < T , R > RFuture < R > evalAsync ( NodeSource nodeSource , boolean readOnlyMode , Codec codec , RedisCommand < T > evalCommandType ,
String script , List < Object > keys , boolean noRetry , Object . . . params ) {
if ( isEvalCacheActive ( ) & & evalCommandType . getName ( ) . equals ( "EVAL" ) ) {
RPromise< R > mainPromise = new RedissonPromise < R > ( ) ;
CompletableFuture< R > mainPromise = new CompletableFuture < > ( ) ;
Object [ ] pps = copy ( params ) ;
RPromise< R > promise = new RedissonPromise < R > ( ) ;
CompletableFuture< R > promise = new CompletableFuture < > ( ) ;
String sha1 = calcSHA ( script ) ;
RedisCommand cmd = new RedisCommand ( evalCommandType , "EVALSHA" ) ;
List < Object > args = new ArrayList < Object > ( 2 + keys . size ( ) + params . length ) ;
@ -516,14 +517,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
connectionManager , objectBuilder , referenceType , noRetry ) ;
executor . execute ( ) ;
promise . o nComplete( ( res , e ) - > {
promise . whe nComplete( ( res , e ) - > {
if ( e ! = null ) {
if ( e . getMessage ( ) . startsWith ( "NOSCRIPT" ) ) {
RFuture < String > loadFuture = loadScript ( executor . getRedisClient ( ) , script ) ;
loadFuture . onComplete ( ( r , ex ) - > {
if ( ex ! = null ) {
free ( pps ) ;
mainPromise . tryFailure ( ex ) ;
mainPromise . completeExceptionally ( ex ) ;
return ;
}
@ -539,28 +540,33 @@ public class CommandAsyncService implements CommandAsyncExecutor {
ns = new NodeSource ( nodeSource , executor . getRedisClient ( ) ) ;
}
async ( readOnlyMode , ns , codec , command , newargs . toArray ( ) , mainPromise , false , noRetry ) ;
RFuture < R > future = async ( readOnlyMode , ns , codec , command , newargs . toArray ( ) , false , noRetry ) ;
future . whenComplete ( ( re , ex с ) - > {
if ( ex с ! = null ) {
mainPromise . completeExceptionally ( ex с ) ;
} else {
mainPromise . complete ( re ) ;
}
} ) ;
} ) ;
} else {
free ( pps ) ;
mainPromise . tryFailure ( e ) ;
mainPromise . completeExceptionally ( e ) ;
}
return ;
}
free ( pps ) ;
mainPromise . trySuccess ( res ) ;
mainPromise . complete ( res ) ;
} ) ;
return mainPromise ;
return new CompletableFutureWrapper < > ( mainPromise ) ;
}
RPromise < R > mainPromise = createPromise ( ) ;
List < Object > args = new ArrayList < Object > ( 2 + keys . size ( ) + params . length ) ;
args . add ( script ) ;
args . add ( keys . size ( ) ) ;
args . addAll ( keys ) ;
args . addAll ( Arrays . asList ( params ) ) ;
async ( readOnlyMode , nodeSource , codec , evalCommandType , args . toArray ( ) , mainPromise , false , noRetry ) ;
return mainPromise ;
return async ( readOnlyMode , nodeSource , codec , evalCommandType , args . toArray ( ) , false , noRetry ) ;
}
@Override
@ -570,25 +576,22 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public < T , R > RFuture < R > writeAsync ( String key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
NodeSource source = getNodeSource ( key ) ;
async ( false , source , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( false , source , codec , command , params , false , false ) ;
}
public < T , R > RFuture < R > writeAsync ( byte [ ] key , Codec codec , RedisCommand < T > command , Object . . . params ) {
RPromise < R > mainPromise = createPromise ( ) ;
NodeSource source = getNodeSource ( key ) ;
async ( false , source , codec , command , params , mainPromise , false , false ) ;
return mainPromise ;
return async ( false , source , codec , command , params , false , false ) ;
}
public < V , R > void async ( boolean readOnlyMode , NodeSource source , Codec codec ,
RedisCommand < V > command , Object [ ] params , RPromise < R > mainPromise ,
boolean ignoreRedirect , boolean noRetry ) {
public < V , R > RFuture < R > async ( boolean readOnlyMode , NodeSource source , Codec codec ,
RedisCommand < V > command , Object [ ] params , boolean ignoreRedirect , boolean noRetry ) {
CompletableFuture < R > mainPromise = createPromise ( ) ;
RedisExecutor < V , R > executor = new RedisExecutor < > ( readOnlyMode , source , codec , command , params , mainPromise ,
ignoreRedirect , connectionManager , objectBuilder , referenceType , noRetry ) ;
executor . execute ( ) ;
return new CompletableFutureWrapper < > ( mainPromise ) ;
}
private void free ( Object [ ] params ) {