@ -15,19 +15,7 @@
* /
package org.redisson ;
import io.netty.buffer.ByteBufUtil ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.FutureListener ;
import io.netty.util.internal.PlatformDependent ;
import io.netty.util.internal.ThreadLocalRandom ;
import org.redisson.client.codec.LongCodec ;
import org.redisson.core.* ;
import org.redisson.core.RScript.Mode ;
import org.redisson.remote.* ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.lang.annotation.Annotation ;
import java.lang.reflect.InvocationHandler ;
import java.lang.reflect.Method ;
import java.lang.reflect.Proxy ;
@ -37,6 +25,34 @@ import java.util.Map;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicReference ;
import org.redisson.client.codec.LongCodec ;
import org.redisson.core.RBatch ;
import org.redisson.core.RBlockingQueue ;
import org.redisson.core.RBlockingQueueAsync ;
import org.redisson.core.RRemoteService ;
import org.redisson.core.RScript ;
import org.redisson.core.RScript.Mode ;
import org.redisson.core.RemoteInvocationOptions ;
import org.redisson.remote.RRemoteAsync ;
import org.redisson.remote.RRemoteServiceResponse ;
import org.redisson.remote.RemoteServiceAck ;
import org.redisson.remote.RemoteServiceAckTimeoutException ;
import org.redisson.remote.RemoteServiceKey ;
import org.redisson.remote.RemoteServiceMethod ;
import org.redisson.remote.RemoteServiceRequest ;
import org.redisson.remote.RemoteServiceResponse ;
import org.redisson.remote.RemoteServiceTimeoutException ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import io.netty.buffer.ByteBufUtil ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.FutureListener ;
import io.netty.util.concurrent.ImmediateEventExecutor ;
import io.netty.util.concurrent.Promise ;
import io.netty.util.internal.PlatformDependent ;
import io.netty.util.internal.ThreadLocalRandom ;
/ * *
*
* @author Nikita Koksharov
@ -197,19 +213,181 @@ public class RedissonRemoteService implements RRemoteService {
}
@Override
public < T > T get ( final Class < T > remoteInterface , final long executionTimeout , final TimeUnit executionTimeUnit ) {
public < T > T get ( Class < T > remoteInterface , long executionTimeout , TimeUnit executionTimeUnit ) {
return get ( remoteInterface , RemoteInvocationOptions . defaults ( )
. expectResultWithin ( executionTimeout , executionTimeUnit ) ) ;
}
public < T > T get ( final Class < T > remoteInterface , final long executionTimeout , final TimeUnit executionTimeUnit ,
final long ackTimeout , final TimeUnit ackTimeUnit ) {
public < T > T get ( Class < T > remoteInterface , long executionTimeout , TimeUnit executionTimeUnit ,
long ackTimeout , TimeUnit ackTimeUnit ) {
return get ( remoteInterface , RemoteInvocationOptions . defaults ( )
. expectAckWithin ( ackTimeout , ackTimeUnit )
. expectResultWithin ( executionTimeout , executionTimeUnit ) ) ;
}
public < T > T get ( final Class < T > remoteInterface , final RemoteInvocationOptions options ) {
public < T > T get ( Class < T > remoteInterface , RemoteInvocationOptions options ) {
for ( Annotation annotation : remoteInterface . getAnnotations ( ) ) {
if ( annotation . annotationType ( ) = = RRemoteAsync . class ) {
Class < T > syncInterface = ( Class < T > ) ( ( RRemoteAsync ) annotation ) . value ( ) ;
for ( Method m : remoteInterface . getMethods ( ) ) {
try {
syncInterface . getMethod ( m . getName ( ) , m . getParameterTypes ( ) ) ;
} catch ( NoSuchMethodException e ) {
throw new IllegalArgumentException ( "Method '" + m . getName ( ) + "' with params '" + Arrays . toString ( m . getParameterTypes ( ) )
+ "' isn't defined in " + syncInterface ) ;
} catch ( SecurityException e ) {
throw new IllegalArgumentException ( e ) ;
}
if ( ! m . getReturnType ( ) . getClass ( ) . isInstance ( Future . class ) ) {
throw new IllegalArgumentException ( m . getReturnType ( ) . getClass ( ) + " isn't allowed as return type" ) ;
}
}
return async ( remoteInterface , options , syncInterface . getName ( ) ) ;
}
}
return sync ( remoteInterface , options ) ;
}
private < T > T async ( Class < T > remoteInterface , final RemoteInvocationOptions options , final String interfaceName ) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions ( options ) ;
final String toString = getClass ( ) . getSimpleName ( ) + "-" + remoteInterface . getSimpleName ( ) + "-proxy-" + generateRequestId ( ) ;
InvocationHandler handler = new InvocationHandler ( ) {
@Override
public Object invoke ( Object proxy , Method method , Object [ ] args ) throws Throwable {
if ( method . getName ( ) . equals ( "toString" ) ) {
return toString ;
} else if ( method . getName ( ) . equals ( "equals" ) ) {
return proxy = = args [ 0 ] ;
} else if ( method . getName ( ) . equals ( "hashCode" ) ) {
return toString . hashCode ( ) ;
}
if ( ! optionsCopy . isResultExpected ( ) & & ! ( method . getReturnType ( ) . equals ( Void . class ) | | method . getReturnType ( ) . equals ( Void . TYPE ) ) )
throw new IllegalArgumentException ( "The noResult option only supports void return value" ) ;
final String requestId = generateRequestId ( ) ;
final Promise < Object > result = ImmediateEventExecutor . INSTANCE . newPromise ( ) ;
String requestQueueName = name + ":{" + interfaceName + "}" ;
RBlockingQueue < RemoteServiceRequest > requestQueue = redisson . getBlockingQueue ( requestQueueName ) ;
final RemoteServiceRequest request = new RemoteServiceRequest ( requestId ,
method . getName ( ) , args , optionsCopy , System . currentTimeMillis ( ) ) ;
Future < Boolean > addFuture = requestQueue . addAsync ( request ) ;
addFuture . addListener ( new FutureListener < Boolean > ( ) {
@Override
public void operationComplete ( Future < Boolean > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
result . setFailure ( future . cause ( ) ) ;
return ;
}
final RBlockingQueue < ? extends RRemoteServiceResponse > responseQueue ;
if ( optionsCopy . isAckExpected ( ) | | optionsCopy . isResultExpected ( ) ) {
String responseName = name + ":{" + interfaceName + "}:" + requestId ;
responseQueue = redisson . getBlockingQueue ( responseName ) ;
} else {
responseQueue = null ;
}
// poll for the ack only if expected
if ( optionsCopy . isAckExpected ( ) ) {
final String ackName = name + ":{" + interfaceName + "}:ack" ;
Future < RemoteServiceAck > ackFuture = ( Future < RemoteServiceAck > ) responseQueue . pollAsync ( optionsCopy . getAckTimeoutInMillis ( ) , TimeUnit . MILLISECONDS ) ;
ackFuture . addListener ( new FutureListener < RemoteServiceAck > ( ) {
@Override
public void operationComplete ( Future < RemoteServiceAck > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
return ;
}
RemoteServiceAck ack = future . getNow ( ) ;
if ( ack = = null ) {
Future < RemoteServiceAck > ackFutureAttempt = tryPollAckAgainAsync ( optionsCopy , responseQueue , ackName ) ;
ackFutureAttempt . addListener ( new FutureListener < RemoteServiceAck > ( ) {
@Override
public void operationComplete ( Future < RemoteServiceAck > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
result . setFailure ( future . cause ( ) ) ;
return ;
}
if ( future . getNow ( ) = = null ) {
Exception ex = new RemoteServiceAckTimeoutException ( "No ACK response after " + optionsCopy . getAckTimeoutInMillis ( ) + "ms for request: " + request ) ;
result . setFailure ( ex ) ;
return ;
}
invokeAsync ( optionsCopy , result , request , responseQueue , ackName ) ;
}
} ) ;
} else {
invokeAsync ( optionsCopy , result , request , responseQueue , ackName ) ;
}
}
private void invokeAsync ( final RemoteInvocationOptions optionsCopy ,
final Promise < Object > result , final RemoteServiceRequest request ,
final RBlockingQueue < ? extends RRemoteServiceResponse > responseQueue ,
final String ackName ) {
Future < Boolean > deleteFuture = redisson . getBucket ( ackName ) . deleteAsync ( ) ;
deleteFuture . addListener ( new FutureListener < Boolean > ( ) {
@Override
public void operationComplete ( Future < Boolean > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
result . setFailure ( future . cause ( ) ) ;
return ;
}
// poll for the response only if expected
if ( optionsCopy . isResultExpected ( ) ) {
Future < RemoteServiceResponse > responseFuture = ( Future < RemoteServiceResponse > ) responseQueue . pollAsync ( optionsCopy . getExecutionTimeoutInMillis ( ) , TimeUnit . MILLISECONDS ) ;
responseFuture . addListener ( new FutureListener < RemoteServiceResponse > ( ) {
@Override
public void operationComplete ( Future < RemoteServiceResponse > future )
throws Exception {
if ( ! future . isSuccess ( ) ) {
result . setFailure ( future . cause ( ) ) ;
return ;
}
if ( future . getNow ( ) = = null ) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException ( "No response after " + optionsCopy . getExecutionTimeoutInMillis ( ) + "ms for request: " + request ) ;
result . setFailure ( e ) ;
return ;
}
if ( future . getNow ( ) . getError ( ) ! = null ) {
result . setFailure ( future . getNow ( ) . getError ( ) ) ;
}
result . setSuccess ( future . getNow ( ) . getResult ( ) ) ;
}
} ) ;
}
}
} ) ;
}
} ) ;
}
}
} ) ;
return result ;
}
} ;
return ( T ) Proxy . newProxyInstance ( remoteInterface . getClassLoader ( ) , new Class [ ] { remoteInterface } , handler ) ;
}
private < T > T sync ( Class < T > remoteInterface , final RemoteInvocationOptions options ) {
final String interfaceName = remoteInterface . getName ( ) ;
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions ( options ) ;
final String toString = getClass ( ) . getSimpleName ( ) + "-" + remoteInterface . getSimpleName ( ) + "-proxy-" + generateRequestId ( ) ;
@ -229,7 +407,7 @@ public class RedissonRemoteService implements RRemoteService {
String requestId = generateRequestId ( ) ;
String requestQueueName = name + ":{" + remoteInterface. getName ( ) + "}" ;
String requestQueueName = name + ":{" + interfaceName + "}" ;
RBlockingQueue < RemoteServiceRequest > requestQueue = redisson . getBlockingQueue ( requestQueueName ) ;
RemoteServiceRequest request = new RemoteServiceRequest ( requestId ,
method . getName ( ) , args , optionsCopy , System . currentTimeMillis ( ) ) ;
@ -237,13 +415,13 @@ public class RedissonRemoteService implements RRemoteService {
RBlockingQueue < RRemoteServiceResponse > responseQueue = null ;
if ( optionsCopy . isAckExpected ( ) | | optionsCopy . isResultExpected ( ) ) {
String responseName = name + ":{" + remoteInterface. getName ( ) + "}:" + requestId ;
String responseName = name + ":{" + interfaceName + "}:" + requestId ;
responseQueue = redisson . getBlockingQueue ( responseName ) ;
}
// poll for the ack only if expected
if ( optionsCopy . isAckExpected ( ) ) {
String ackName = name + ":{" + remoteInterface. getName ( ) + "}:ack" ;
String ackName = name + ":{" + interfaceName + "}:ack" ;
RemoteServiceAck ack = ( RemoteServiceAck ) responseQueue . poll ( optionsCopy . getAckTimeoutInMillis ( ) , TimeUnit . MILLISECONDS ) ;
if ( ack = = null ) {
ack = tryPollAckAgain ( optionsCopy , responseQueue , ackName ) ;
@ -274,7 +452,7 @@ public class RedissonRemoteService implements RRemoteService {
}
private RemoteServiceAck tryPollAckAgain ( RemoteInvocationOptions optionsCopy ,
RBlockingQueue < RRemoteServiceResponse > responseQueue , String ackName ) throws InterruptedException {
RBlockingQueue < ? extends RRemoteServiceResponse > responseQueue , String ackName ) throws InterruptedException {
Future < Boolean > ackClientsFuture = redisson . getScript ( ) . evalAsync ( ackName , Mode . READ_WRITE , LongCodec . INSTANCE ,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
@ -289,6 +467,43 @@ public class RedissonRemoteService implements RRemoteService {
}
return null ;
}
private Future < RemoteServiceAck > tryPollAckAgainAsync ( RemoteInvocationOptions optionsCopy ,
final RBlockingQueue < ? extends RRemoteServiceResponse > responseQueue , String ackName )
throws InterruptedException {
final Promise < RemoteServiceAck > promise = ImmediateEventExecutor . INSTANCE . newPromise ( ) ;
Future < Boolean > ackClientsFuture = redisson . getScript ( ) . evalAsync ( ackName , Mode . READ_WRITE , LongCodec . INSTANCE ,
"if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;" + "end;" + "redis.call('del', KEYS[1]);" + "return 1;" ,
RScript . ReturnType . BOOLEAN , Arrays . < Object > asList ( ackName ) , optionsCopy . getAckTimeoutInMillis ( ) ) ;
ackClientsFuture . addListener ( new FutureListener < Boolean > ( ) {
@Override
public void operationComplete ( Future < Boolean > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
promise . setFailure ( future . cause ( ) ) ;
return ;
}
if ( future . getNow ( ) ) {
Future < RemoteServiceAck > pollFuture = ( Future < RemoteServiceAck > ) responseQueue . pollAsync ( ) ;
pollFuture . addListener ( new FutureListener < RemoteServiceAck > ( ) {
@Override
public void operationComplete ( Future < RemoteServiceAck > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
promise . setFailure ( future . cause ( ) ) ;
return ;
}
promise . setSuccess ( future . getNow ( ) ) ;
}
} ) ;
} else {
promise . setSuccess ( null ) ;
}
}
} ) ;
return promise ;
}
private String generateRequestId ( ) {
byte [ ] id = new byte [ 16 ] ;