diff --git a/pom.xml b/pom.xml index 5c8ff7b7a..67d9053b0 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.2.22-SNAPSHOT + 2.2.23-SNAPSHOT bundle Redisson diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 1d3e3b67c..07cf37a60 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -150,7 +150,7 @@ public class RedissonRemoteService implements RRemoteService { // send the ack only if expected if (request.getOptions().isAckExpected()) { - String ackName = name + ":{" + remoteInterface.getName() + "}:ack"; + String ackName = getAckName(remoteInterface.getName(), request.getRequestId()); Future ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE, "if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[2]);" @@ -159,9 +159,9 @@ public class RedissonRemoteService implements RRemoteService { + "return 1;" + "end;" + "return 0;", RScript.ReturnType.BOOLEAN, Arrays.asList(ackName, responseName), - getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis()); -// Future> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck()); -// ackClientsFuture.addListener(new FutureListener>() { + getCodec().getValueEncoder().encode(new RemoteServiceAck()), + request.getOptions().getAckTimeoutInMillis()); + ackClientsFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -267,7 +267,7 @@ public class RedissonRemoteService implements RRemoteService { return sync(remoteInterface, options); } - private T async(Class remoteInterface, final RemoteInvocationOptions options, final String interfaceName) { + private T async(final Class remoteInterface, final RemoteInvocationOptions options, final String interfaceName) { // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); @@ -282,8 +282,12 @@ public class RedissonRemoteService implements RRemoteService { return toString.hashCode(); } - if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) + if (!optionsCopy.isResultExpected() + && !(method.getReturnType().equals(Void.class) + || method.getReturnType().equals(Void.TYPE) + || method.getReturnType().equals(Future.class))) { throw new IllegalArgumentException("The noResult option only supports void return value"); + } final String requestId = generateRequestId(); final Promise result = ImmediateEventExecutor.INSTANCE.newPromise(); @@ -310,19 +314,19 @@ public class RedissonRemoteService implements RRemoteService { responseQueue = null; } - // poll for the ack only if expected if (optionsCopy.isAckExpected()) { - final String ackName = name + ":{" + interfaceName + "}:ack"; Future ackFuture = (Future) responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); ackFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + result.setFailure(future.cause()); return; } RemoteServiceAck ack = future.getNow(); if (ack == null) { + final String ackName = getAckName(remoteInterface.getName(), request.getRequestId()); Future ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName); ackFutureAttempt.addListener(new FutureListener() { @@ -343,55 +347,15 @@ public class RedissonRemoteService implements RRemoteService { } }); } else { - invokeAsync(optionsCopy, result, request, responseQueue, ackName); + invokeAsync(optionsCopy, result, request, responseQueue); } } - private void invokeAsync(final RemoteInvocationOptions optionsCopy, - final Promise result, final RemoteServiceRequest request, - final RBlockingQueue responseQueue, - final String ackName) { - Future deleteFuture = redisson.getBucket(ackName).deleteAsync(); - deleteFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.setFailure(future.cause()); - return; - } - - // poll for the response only if expected - if (optionsCopy.isResultExpected()) { - Future responseFuture = (Future) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); - responseFuture.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - result.setFailure(future.cause()); - return; - } - - if (future.getNow() == null) { - RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); - result.setFailure(e); - return; - } - - if (future.getNow().getError() != null) { - result.setFailure(future.getNow().getError()); - } - - result.setSuccess(future.getNow().getResult()); - } - }); - } - - } - }); - } }); + } else { + if (optionsCopy.isResultExpected()) { + invokeAsync(optionsCopy, result, request, responseQueue); + } } } }); @@ -403,6 +367,55 @@ public class RedissonRemoteService implements RRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); } + private void invokeAsync(final RemoteInvocationOptions optionsCopy, + final Promise result, final RemoteServiceRequest request, + final RBlockingQueue responseQueue, + final String ackName) { + Future deleteFuture = redisson.getBucket(ackName).deleteAsync(); + deleteFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.setFailure(future.cause()); + return; + } + + invokeAsync(optionsCopy, result, request, responseQueue); + } + }); + } + + private void invokeAsync(final RemoteInvocationOptions optionsCopy, final Promise result, + final RemoteServiceRequest request, + final RBlockingQueue responseQueue) { + // poll for the response only if expected + if (optionsCopy.isResultExpected()) { + Future responseFuture = (Future) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); + responseFuture.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + result.setFailure(future.cause()); + return; + } + + if (future.getNow() == null) { + RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); + result.setFailure(e); + return; + } + + if (future.getNow().getError() != null) { + result.setFailure(future.getNow().getError()); + } + + result.setSuccess(future.getNow().getResult()); + } + }); + } + } private T sync(Class remoteInterface, final RemoteInvocationOptions options) { final String interfaceName = remoteInterface.getName(); @@ -439,7 +452,7 @@ public class RedissonRemoteService implements RRemoteService { // poll for the ack only if expected if (optionsCopy.isAckExpected()) { - String ackName = name + ":{" + interfaceName + "}:ack"; + String ackName = getAckName(interfaceName, requestId); RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); if (ack == null) { ack = tryPollAckAgain(optionsCopy, responseQueue, ackName); @@ -469,6 +482,10 @@ public class RedissonRemoteService implements RRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); } + private String getAckName(String interfaceName, String requestId) { + return name + ":{" + interfaceName + "}:" + requestId + ":ack"; + } + private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, RBlockingQueue responseQueue, String ackName) throws InterruptedException { Future ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE, @@ -477,7 +494,8 @@ public class RedissonRemoteService implements RRemoteService { + "return 0;" + "end;" + "redis.call('del', KEYS[1]);" - + "return 1;", RScript.ReturnType.BOOLEAN, Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis()); + + "return 1;", + RScript.ReturnType.BOOLEAN, Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis()); ackClientsFuture.sync(); if (ackClientsFuture.getNow()) { @@ -491,8 +509,12 @@ public class RedissonRemoteService implements RRemoteService { throws InterruptedException { final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); Future ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE, - "if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[1]);" - + "return 0;" + "end;" + "redis.call('del', KEYS[1]);" + "return 1;", + "if redis.call('setnx', KEYS[1], 1) == 1 then " + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return 0;" + + "end;" + + "redis.call('del', KEYS[1]);" + + "return 1;", RScript.ReturnType.BOOLEAN, Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); ackClientsFuture.addListener(new FutureListener() { @Override diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 86d9a693a..8d672716c 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -69,6 +69,12 @@ public class RedissonRemoteServiceTest extends BaseTest { Future resultMethod(Long value); + Future errorMethod(); + + Future errorMethodWithCause(); + + Future timeoutMethod(); + } @RRemoteAsync(RemoteInterface.class) @@ -474,6 +480,47 @@ public class RedissonRemoteServiceTest extends BaseTest { } } + @Test + public void testNoAckWithResultInvocationsAsync() throws InterruptedException, ExecutionException { + RedissonClient server = createInstance(); + RedissonClient client = createInstance(); + try { + server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + // no ack but an execution timeout of 1 second + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS); + RemoteInterfaceAsync service = client.getRemoteSerivce().get(RemoteInterfaceAsync.class, options); + + service.voidMethod("noAck", 100L).get(); + assertThat(service.resultMethod(21L).get()).isEqualTo(42); + + try { + service.errorMethod().get(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause().getMessage()).isEqualTo("Checking error throw"); + } + + try { + service.errorMethodWithCause().get(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause().getCause()).isInstanceOf(ArithmeticException.class); + assertThat(e.getCause().getCause().getMessage()).isEqualTo("/ by zero"); + } + + try { + service.timeoutMethod().get(); + Assert.fail("noAck option should still wait for the server to return a response and throw if the execution timeout is exceeded"); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(RemoteServiceTimeoutException.class); + } + } finally { + client.shutdown(); + server.shutdown(); + } + } + @Test public void testAckWithoutResultInvocations() throws InterruptedException { RedissonClient server = createInstance();