@ -17,12 +17,11 @@ package org.redisson;
import java.lang.reflect.Method ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.Map ;
import java.util.Set ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicReference ;
import org.redisson.api.RBlockingQueue ;
@ -65,10 +64,33 @@ import io.netty.util.internal.PlatformDependent;
* /
public class RedissonRemoteService extends BaseRemoteService implements RRemoteService {
public static class Entry {
RFuture < String > future ;
final AtomicInteger counter ;
public Entry ( int workers ) {
counter = new AtomicInteger ( workers ) ;
}
public void setFuture ( RFuture < String > future ) {
this . future = future ;
}
public RFuture < String > getFuture ( ) {
return future ;
}
public AtomicInteger getCounter ( ) {
return counter ;
}
}
private static final Logger log = LoggerFactory . getLogger ( RedissonRemoteService . class ) ;
private final Map < RemoteServiceKey , RemoteServiceMethod > beans = PlatformDependent . newConcurrentHashMap ( ) ;
private final Map < Class < ? > , Set < RFuture < String > > > futures = PlatformDependent . newConcurrentHashMap ( ) ;
private final Map < Class < ? > , Entry> remoteMap = PlatformDependent . newConcurrentHashMap ( ) ;
public RedissonRemoteService ( Codec codec , RedissonClient redisson , String name , CommandExecutor commandExecutor , String executorId , ConcurrentMap < String , ResponseEntry > responses ) {
super ( codec , redisson , name , commandExecutor , executorId , responses ) ;
@ -110,20 +132,19 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
beans . remove ( key ) ;
}
Set < RFuture < String > > removedFutures = futures . remove ( remoteInterface ) ;
if ( removedFutures = = null ) {
return ;
}
for ( RFuture < String > future : removedFutures ) {
future . cancel ( false ) ;
Entry entry = remoteMap . remove ( remoteInterface ) ;
if ( entry ! = null & & entry . getFuture ( ) ! = null ) {
entry . getFuture ( ) . cancel ( false ) ;
}
}
@Override
public int getFreeWorkers ( Class < ? > remoteInterface ) {
Set < RFuture < String > > futuresSet = futures . get ( remoteInterface ) ;
return futuresSet . size ( ) ;
Entry entry = remoteMap . remove ( remoteInterface ) ;
if ( entry = = null ) {
return 0 ;
}
return entry . getCounter ( ) . get ( ) ;
}
@Override
@ -144,32 +165,28 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
Set < RFuture < String > > values = Collections . newSetFromMap ( PlatformDependent . < RFuture < String > , Boolean > newConcurrentHashMap ( ) ) ;
futures . put ( remoteInterface , values ) ;
remoteMap . put ( remoteInterface , new Entry ( workers ) ) ;
String requestQueueName = getRequestQueueName ( remoteInterface ) ;
RBlockingQueue < String > requestQueue = redisson . getBlockingQueue ( requestQueueName , StringCodec . INSTANCE ) ;
for ( int i = 0 ; i < workers ; i + + ) {
subscribe ( remoteInterface , requestQueue , executor ) ;
}
subscribe ( remoteInterface , requestQueue , executor ) ;
}
private < T > void subscribe ( final Class < T > remoteInterface , final RBlockingQueue < String > requestQueue ,
final ExecutorService executor ) {
Set < RFuture < String > > futuresSet = futures . get ( remoteInterface ) ;
if ( futuresSet = = null ) {
final Entry entry = remoteMap . get ( remoteInterface ) ;
if ( entry = = null ) {
return ;
}
final RFuture < String > take = requestQueue . takeAsync ( ) ;
futuresSet. add ( take ) ;
entry. setFuture ( take ) ;
take . addListener ( new FutureListener < String > ( ) {
@Override
public void operationComplete ( Future < String > future ) throws Exception {
Set< RFuture < String > > futuresSet = futures . get ( remoteInterface ) ;
if ( futuresSet = = null ) {
public void operationComplete ( Future < String > future ) throws Exception {
Entry entry = remoteMap . get ( remoteInterface ) ;
if ( entry = = null ) {
return ;
}
futuresSet . remove ( take ) ;
if ( ! future . isSuccess ( ) ) {
if ( future . cause ( ) instanceof RedissonShutdownException ) {
@ -184,6 +201,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// do not subscribe now, see
// https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue);
if ( entry . getCounter ( ) . get ( ) = = 0 ) {
return ;
}
if ( entry . getCounter ( ) . decrementAndGet ( ) > 0 ) {
subscribe ( remoteInterface , requestQueue , executor ) ;
}
final String requestId = future . getNow ( ) ;
RMap < String , RemoteServiceRequest > tasks = redisson . getMap ( requestQueue . getName ( ) + ":tasks" , new CompositeCodec ( StringCodec . INSTANCE , codec , codec ) ) ;
@ -197,16 +222,18 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return ;
}
log . error ( "Can't process the remote service request with id " + requestId , future . cause ( ) ) ;
// re-subscribe after a failed takeAsync
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
final RemoteServiceRequest request = future . getNow ( ) ;
if ( request = = null ) {
log . debug ( "Task can't be found for request: {}" , requestId ) ;
// re-subscribe after a skipped ackTimeout
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
@ -215,8 +242,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if ( request . getOptions ( ) . isAckExpected ( ) & & elapsedTime > request
. getOptions ( ) . getAckTimeoutInMillis ( ) ) {
log . debug ( "request: {} has been skipped due to ackTimeout. Elapsed time: {}ms" , request . getId ( ) , elapsedTime ) ;
// re-subscribe after a skipped ackTimeout
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
@ -247,13 +275,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return ;
}
log . error ( "Can't send ack for request: " + request , future . cause ( ) ) ;
// re-subscribe after a failed send (ack)
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
if ( ! future . getNow ( ) ) {
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
@ -269,13 +298,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return ;
}
log . error ( "Can't send ack for request: " + request , future . cause ( ) ) ;
// re-subscribe after a failed send (ack)
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
if ( ! future . getNow ( ) ) {
subscribe( remoteInterface , requestQueue , executor ) ;
re subscribe( remoteInterface , requestQueue , executor ) ;
return ;
}
@ -374,7 +404,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
@Override
public void operationComplete ( Future < Void > future ) throws Exception {
// interface has been deregistered
if ( futures . get ( remoteInterface ) = = null ) {
if ( ! remoteMap . containsKey ( remoteInterface ) ) {
return ;
}
@ -386,12 +416,17 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
future . cause ( ) ) ;
}
// re-subscribe anyways (fail or success) after the send
// (response)
subscribe ( remoteInterface , requestQueue , executor ) ;
resubscribe ( remoteInterface , requestQueue , executor ) ;
}
} ) ;
} else {
resubscribe ( remoteInterface , requestQueue , executor ) ;
}
}
private < T > void resubscribe ( Class < T > remoteInterface , RBlockingQueue < String > requestQueue ,
ExecutorService executor ) {
if ( remoteMap . get ( remoteInterface ) . getCounter ( ) . getAndIncrement ( ) = = 0 ) {
// re-subscribe anyways after the method invocation
subscribe ( remoteInterface , requestQueue , executor ) ;
}