diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 166362af9..526eb554d 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -30,7 +30,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -106,7 +106,7 @@ public class RedissonExecutorService implements RExecutorService { remoteService.setTasksCounterName(tasksCounter.getName()); remoteService.setStatusName(status.getName()); - asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); + asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(Integer.MAX_VALUE * 2)); asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); } @@ -116,7 +116,7 @@ public class RedissonExecutorService implements RExecutorService { } @Override - public void registerWorkers(int executors, Executor executor) { + public void registerWorkers(int executors, ExecutorService executor) { RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName); service.setStatusName(status.getName()); service.setTasksCounterName(tasksCounter.getName()); diff --git a/redisson/src/main/java/org/redisson/RedissonQueueSemaphore.java b/redisson/src/main/java/org/redisson/RedissonQueueSemaphore.java index 1dea579c9..af820b386 100644 --- a/redisson/src/main/java/org/redisson/RedissonQueueSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonQueueSemaphore.java @@ -20,13 +20,17 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.pubsub.SemaphorePubSub; import io.netty.util.concurrent.Future; +/** + * + * @author Nikita Koksharov + * + */ public class RedissonQueueSemaphore extends RedissonSemaphore { private String queueName; diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 17cc2a543..83f109d9c 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -23,7 +23,7 @@ import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -31,17 +31,18 @@ import org.redisson.api.RBatch; import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RRemoteService; -import org.redisson.api.RScript; -import org.redisson.api.RScript.Mode; import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.executor.RemotePromise; import org.redisson.remote.RRemoteAsync; import org.redisson.remote.RRemoteServiceResponse; import org.redisson.remote.RemoteServiceAck; import org.redisson.remote.RemoteServiceAckTimeoutException; +import org.redisson.remote.RemoteServiceCancelRequest; +import org.redisson.remote.RemoteServiceCancelResponse; import org.redisson.remote.RemoteServiceKey; import org.redisson.remote.RemoteServiceMethod; import org.redisson.remote.RemoteServiceRequest; @@ -65,25 +66,25 @@ import io.netty.util.internal.ThreadLocalRandom; */ public class RedissonRemoteService implements RRemoteService { - private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); - + private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); + private final Map beans = PlatformDependent.newConcurrentHashMap(); - + protected final Codec codec; protected final Redisson redisson; protected final String name; protected final CommandExecutor commandExecutor; - + public RedissonRemoteService(Redisson redisson, CommandExecutor commandExecutor) { - this(redisson, "redisson_remote_service", commandExecutor); + this(redisson, "redisson_rs", commandExecutor); } public RedissonRemoteService(Redisson redisson, String name, CommandExecutor commandExecutor) { this(null, redisson, name, commandExecutor); } - + public RedissonRemoteService(Codec codec, Redisson redisson, CommandExecutor commandExecutor) { - this(codec, redisson, "redisson_remote_service", commandExecutor); + this(codec, redisson, "redisson_rs", commandExecutor); } public RedissonRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) { @@ -97,13 +98,14 @@ public class RedissonRemoteService implements RRemoteService { public void register(Class remoteInterface, T object) { register(remoteInterface, object, 1); } - + @Override public void register(Class remoteInterface, T object, int workersAmount) { register(remoteInterface, object, workersAmount, null); } - - public void register(Class remoteInterface, T object, int workersAmount, Executor executor) { + + @Override + public void register(Class remoteInterface, T object, int workersAmount, ExecutorService executor) { if (workersAmount < 1) { throw new IllegalArgumentException("executorsAmount can't be lower than 1"); } @@ -114,7 +116,7 @@ public class RedissonRemoteService implements RRemoteService { return; } } - + for (int i = 0; i < workersAmount; i++) { String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); @@ -122,6 +124,10 @@ public class RedissonRemoteService implements RRemoteService { } } + private String getCancelRequestQueueName(Class remoteInterface, String requestId) { + return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":cancel"; + } + private String getAckName(Class remoteInterface, String requestId) { return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":ack"; } @@ -133,7 +139,7 @@ public class RedissonRemoteService implements RRemoteService { private String getRequestQueueName(Class remoteInterface) { return "{" + name + ":" + remoteInterface.getName() + "}"; } - + private Codec getCodec() { if (codec != null) { return codec; @@ -149,7 +155,8 @@ public class RedissonRemoteService implements RRemoteService { } } - private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue, final Executor executor) { + private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue, + final ExecutorService executor) { Future take = requestQueue.takeAsync(); take.addListener(new FutureListener() { @Override @@ -163,35 +170,39 @@ public class RedissonRemoteService implements RRemoteService { return; } - // do not subscribe now, see https://github.com/mrniko/redisson/issues/493 + // do not subscribe now, see + // https://github.com/mrniko/redisson/issues/493 // subscribe(remoteInterface, requestQueue); final RemoteServiceRequest request = future.getNow(); // check the ack only if expected - if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) { + if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request + .getOptions().getAckTimeoutInMillis()) { log.debug("request: {} has been skipped due to ackTimeout"); // re-subscribe after a skipped ackTimeout subscribe(remoteInterface, requestQueue, executor); return; } - - final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); + + final RemoteServiceMethod method = beans + .get(new RemoteServiceKey(remoteInterface, request.getMethodName())); final String responseName = getResponseQueueName(remoteInterface, request.getRequestId()); // send the ack only if expected if (request.getOptions().isAckExpected()) { String ackName = getAckName(remoteInterface, request.getRequestId()); - Future ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE, - "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[2]);" - + "redis.call('rpush', KEYS[2], ARGV[1]);" - + "redis.call('pexpire', KEYS[2], ARGV[2]);" - + "return 1;" - + "end;" - + "return 0;", RScript.ReturnType.BOOLEAN, Arrays.asList(ackName, responseName), - encode(new RemoteServiceAck()), - request.getOptions().getAckTimeoutInMillis()); - + Future ackClientsFuture = commandExecutor.evalWriteAsync(responseName, + LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('setnx', KEYS[1], 1) == 1 then " + + "redis.call('pexpire', KEYS[1], ARGV[2]);" + + "redis.call('rpush', KEYS[2], ARGV[1]);" + + "redis.call('pexpire', KEYS[2], ARGV[2]);" + + "return 1;" + + "end;" + + "return 0;", + Arrays. asList(ackName, responseName), + encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis()); + ackClientsFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -210,61 +221,90 @@ public class RedissonRemoteService implements RRemoteService { return; } - if (executor != null) { - executor.execute(new Runnable() { - @Override - public void run() { - invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); - } - }); - } else { - invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); - } + runMethod(remoteInterface, requestQueue, executor, request, method, responseName); } }); } else { - if (executor != null) { - executor.execute(new Runnable() { - @Override - public void run() { - invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); - } - }); - } else { - invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor); - } + runMethod(remoteInterface, requestQueue, executor, request, method, responseName); } } }); } - private void invokeMethod(final Class remoteInterface, final RBlockingQueue requestQueue, - final RemoteServiceRequest request, RemoteServiceMethod method, String responseName, final Executor executor) { - final AtomicReference responseHolder = new AtomicReference(); + private void runMethod(final Class remoteInterface, final RBlockingQueue requestQueue, + final ExecutorService executor, final RemoteServiceRequest request, final RemoteServiceMethod method, + final String responseName) { + + if (executor != null) { + RBlockingQueue cancelRequestQueue = + redisson.getBlockingQueue(getCancelRequestQueueName(remoteInterface, request.getRequestId()), getCodec()); + final Future cancelRequestFuture = cancelRequestQueue.takeAsync(); + + final AtomicReference responseHolder = new AtomicReference(); + + final java.util.concurrent.Future submitFuture = executor.submit(new Runnable() { + @Override + public void run() { + invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, + cancelRequestFuture, responseHolder); + } + }); + + cancelRequestFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + boolean res = submitFuture.cancel(future.getNow().isMayInterruptIfRunning()); + if (res) { + responseHolder.compareAndSet(null, new RemoteServiceCancelResponse()); + } + } + }); + } else { + final AtomicReference responseHolder = new AtomicReference(); + invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, null, responseHolder); + } + } + + private void invokeMethod(final Class remoteInterface, + final RBlockingQueue requestQueue, final RemoteServiceRequest request, + RemoteServiceMethod method, String responseName, final ExecutorService executor, + Future cancelRequestFuture, AtomicReference responseHolder) { try { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); + RemoteServiceResponse response = new RemoteServiceResponse(result); - responseHolder.set(response); + responseHolder.compareAndSet(null, response); } catch (Exception e) { RemoteServiceResponse response = new RemoteServiceResponse(e.getCause()); - responseHolder.set(response); + responseHolder.compareAndSet(null, response); log.error("Can't execute: " + request, e); } + if (cancelRequestFuture != null) { + cancelRequestFuture.cancel(false); + } + // send the response only if expected if (request.getOptions().isResultExpected()) { - Future> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName, responseHolder.get()); + Future> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName, + responseHolder.get()); clientsFuture.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { if (!future.isSuccess()) { - log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause()); + log.error("Can't send response: " + responseHolder.get() + " for request: " + request, + future.cause()); if (future.cause() instanceof RedissonShutdownException) { return; } } - // re-subscribe anyways (fail or success) after the send (response) + // re-subscribe anyways (fail or success) after the send + // (response) subscribe(remoteInterface, requestQueue, executor); } }); @@ -281,15 +321,14 @@ public class RedissonRemoteService implements RRemoteService { @Override public T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit) { - return get(remoteInterface, RemoteInvocationOptions.defaults() - .expectResultWithin(executionTimeout, executionTimeUnit)); + return get(remoteInterface, + RemoteInvocationOptions.defaults().expectResultWithin(executionTimeout, executionTimeUnit)); } @Override - public T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, - long ackTimeout, TimeUnit ackTimeUnit) { - return get(remoteInterface, RemoteInvocationOptions.defaults() - .expectAckWithin(ackTimeout, ackTimeUnit) + public T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, + TimeUnit ackTimeUnit) { + return get(remoteInterface, RemoteInvocationOptions.defaults().expectAckWithin(ackTimeout, ackTimeUnit) .expectResultWithin(executionTimeout, executionTimeUnit)); } @@ -297,31 +336,34 @@ public class RedissonRemoteService implements RRemoteService { public T get(Class remoteInterface, RemoteInvocationOptions options) { for (Annotation annotation : remoteInterface.getAnnotations()) { if (annotation.annotationType() == RRemoteAsync.class) { - Class syncInterface = (Class) ((RRemoteAsync)annotation).value(); + Class syncInterface = (Class) ((RRemoteAsync) annotation).value(); for (Method m : remoteInterface.getMethods()) { try { syncInterface.getMethod(m.getName(), m.getParameterTypes()); } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Method '" + m.getName() + "' with params '" + Arrays.toString(m.getParameterTypes()) - + "' isn't defined in " + syncInterface); + throw new IllegalArgumentException("Method '" + m.getName() + "' with params '" + + Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface); } catch (SecurityException e) { throw new IllegalArgumentException(e); } if (!m.getReturnType().getClass().isInstance(Future.class)) { - throw new IllegalArgumentException(m.getReturnType().getClass() + " isn't allowed as return type"); + throw new IllegalArgumentException( + m.getReturnType().getClass() + " isn't allowed as return type"); } } return async(remoteInterface, options, syncInterface); } } - + return sync(remoteInterface, options); } - - private T async(final Class remoteInterface, final RemoteInvocationOptions options, final Class syncInterface) { + + private T async(final Class remoteInterface, final RemoteInvocationOptions options, + final Class syncInterface) { // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); - final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); + final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + + generateRequestId(); InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { @@ -333,10 +375,8 @@ public class RedissonRemoteService implements RRemoteService { return toString.hashCode(); } - if (!optionsCopy.isResultExpected() - && !(method.getReturnType().equals(Void.class) - || method.getReturnType().equals(Void.TYPE) - || method.getReturnType().equals(Future.class))) { + if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) + || method.getReturnType().equals(Void.TYPE) || method.getReturnType().equals(Future.class))) { throw new IllegalArgumentException("The noResult option only supports void return value"); } @@ -345,33 +385,64 @@ public class RedissonRemoteService implements RRemoteService { final String requestQueueName = getRequestQueueName(syncInterface); final String responseName = getResponseQueueName(syncInterface, requestId); final String ackName = getAckName(syncInterface, requestId); - - final RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); - final RemoteServiceRequest request = new RemoteServiceRequest(requestId, - method.getName(), args, optionsCopy, System.currentTimeMillis()); - final RemotePromise result = new RemotePromise(ImmediateEventExecutor.INSTANCE.newPromise()) { + final RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, + getCodec()); + final RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, + optionsCopy, System.currentTimeMillis()); + + final RemotePromise result = new RemotePromise(commandExecutor.getConnectionManager().newPromise()) { + @Override public boolean cancel(boolean mayInterruptIfRunning) { + if (isCancelled()) { + return true; + } + + if (isDone()) { + return false; + } + if (optionsCopy.isAckExpected()) { - Future future = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE, + Future future = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE, + RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[2]);" - + "redis.call('lrem', KEYS[3], 1, ARGV[1]);" - + "redis.call('pexpire', KEYS[2], ARGV[2]);" - + "return 1;" - + "end;" - + "return 0;", RScript.ReturnType.BOOLEAN, Arrays.asList(ackName, responseName, requestQueueName), - encode(request), - request.getOptions().getAckTimeoutInMillis()); - - return commandExecutor.get(future); + + "redis.call('pexpire', KEYS[1], ARGV[2]);" + + "redis.call('lrem', KEYS[3], 1, ARGV[1]);" + + "redis.call('pexpire', KEYS[2], ARGV[2]);" + + "return 1;" + + "end;" + + "return 0;", + Arrays. asList(ackName, responseName, requestQueueName), + encode(request), request.getOptions().getAckTimeoutInMillis()); + + boolean ackNotSent = commandExecutor.get(future); + if (ackNotSent) { + return true; + } + + return cancel(syncInterface, requestId, request, mayInterruptIfRunning); } - - return requestQueue.remove(request); + + boolean removed = requestQueue.remove(request); + if (removed) { + return true; + } + + return cancel(syncInterface, requestId, request, mayInterruptIfRunning); + } + + private boolean cancel(Class remoteInterface, String requestId, RemoteServiceRequest request, + boolean mayInterruptIfRunning) { + RBlockingQueueAsync cancelRequestQueue = redisson.getBlockingQueue(getCancelRequestQueueName(remoteInterface, requestId), getCodec()); + cancelRequestQueue.putAsync(new RemoteServiceCancelRequest(mayInterruptIfRunning)); + cancelRequestQueue.expireAsync(60, TimeUnit.SECONDS); + + awaitUninterruptibly(); + return isCancelled(); } }; - + Future addFuture = addAsync(requestQueue, request, result); addFuture.addListener(new FutureListener() { @@ -381,16 +452,10 @@ public class RedissonRemoteService implements RRemoteService { result.tryFailure(future.cause()); return; } - - final RBlockingQueue responseQueue; - if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { - responseQueue = redisson.getBlockingQueue(responseName, getCodec()); - } else { - responseQueue = null; - } - + if (optionsCopy.isAckExpected()) { - Future ackFuture = (Future) responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); + RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName, getCodec()); + Future ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); ackFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -398,51 +463,54 @@ public class RedissonRemoteService implements RRemoteService { result.tryFailure(future.cause()); return; } - + RemoteServiceAck ack = future.getNow(); if (ack == null) { - Future ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName); + Future ackFutureAttempt = + tryPollAckAgainAsync(optionsCopy, responseQueue, ackName); ackFutureAttempt.addListener(new FutureListener() { - + @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) + throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } - + if (future.getNow() == null) { - Exception ex = new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); + Exception ex = new RemoteServiceAckTimeoutException( + "No ACK response after " + + optionsCopy.getAckTimeoutInMillis() + + "ms for request: " + request); result.tryFailure(ex); return; } - - invokeAsync(optionsCopy, result, request, responseQueue, ackName); + + invokeAsync(optionsCopy, result, request, responseName, ackName); } }); } else { - invokeAsync(optionsCopy, result, request, responseQueue); + invokeAsync(optionsCopy, result, request, responseName); } } }); } else { - invokeAsync(optionsCopy, result, request, responseQueue); - } + invokeAsync(optionsCopy, result, request, responseName); } + } }); return result; } }; - return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); + return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); } - private void invokeAsync(final RemoteInvocationOptions optionsCopy, - final Promise result, final RemoteServiceRequest request, - final RBlockingQueue responseQueue, - final String ackName) { + private void invokeAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, + final RemoteServiceRequest request, final String responseName, final String ackName) { Future deleteFuture = redisson.getBucket(ackName).deleteAsync(); deleteFuture.addListener(new FutureListener() { @Override @@ -451,50 +519,57 @@ public class RedissonRemoteService implements RRemoteService { result.tryFailure(future.cause()); return; } - - invokeAsync(optionsCopy, result, request, responseQueue); + + invokeAsync(optionsCopy, result, request, responseName); } }); } - private void invokeAsync(final RemoteInvocationOptions optionsCopy, final Promise result, - final RemoteServiceRequest request, - final RBlockingQueue responseQueue) { + private void invokeAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, + final RemoteServiceRequest request, final String responseName) { // poll for the response only if expected if (optionsCopy.isResultExpected()) { - Future responseFuture = (Future) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); - responseFuture.addListener(new FutureListener() { - + RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName, getCodec()); + Future responseFuture = responseQueue + .pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); + responseFuture.addListener(new FutureListener() { + @Override - public void operationComplete(Future future) - throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } - + if (future.getNow() == null) { - RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); + RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); result.tryFailure(e); return; } - - if (future.getNow().getError() != null) { - result.tryFailure(future.getNow().getError()); + + if (future.getNow() instanceof RemoteServiceCancelResponse) { + result.doCancel(); return; } - result.trySuccess(future.getNow().getResult()); + RemoteServiceResponse response = (RemoteServiceResponse) future.getNow(); + if (response.getError() != null) { + result.tryFailure(response.getError()); + return; + } + + result.trySuccess(response.getResult()); } }); } } private T sync(final Class remoteInterface, final RemoteInvocationOptions options) { - final String interfaceName = remoteInterface.getName(); // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); - final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId(); + final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + + generateRequestId(); InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { @@ -506,15 +581,17 @@ public class RedissonRemoteService implements RRemoteService { return toString.hashCode(); } - if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) + if (!optionsCopy.isResultExpected() + && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) throw new IllegalArgumentException("The noResult option only supports void return value"); String requestId = generateRequestId(); String requestQueueName = getRequestQueueName(remoteInterface); - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); - RemoteServiceRequest request = new RemoteServiceRequest(requestId, - method.getName(), args, optionsCopy, System.currentTimeMillis()); + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, + getCodec()); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, optionsCopy, + System.currentTimeMillis()); requestQueue.add(request); RBlockingQueue responseQueue = null; @@ -526,11 +603,13 @@ public class RedissonRemoteService implements RRemoteService { // poll for the ack only if expected if (optionsCopy.isAckExpected()) { String ackName = getAckName(remoteInterface, requestId); - RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); + RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), + TimeUnit.MILLISECONDS); if (ack == null) { ack = tryPollAckAgain(optionsCopy, responseQueue, ackName); if (ack == null) { - throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); + throw new RemoteServiceAckTimeoutException("No ACK response after " + + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); } } redisson.getBucket(ackName).delete(); @@ -538,9 +617,11 @@ public class RedissonRemoteService implements RRemoteService { // poll for the response only if expected if (optionsCopy.isResultExpected()) { - RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); + RemoteServiceResponse response = (RemoteServiceResponse) responseQueue + .poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); if (response == null) { - throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); + throw new RemoteServiceTimeoutException("No response1 after " + + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); } if (response.getError() != null) { throw response.getError(); @@ -552,39 +633,40 @@ public class RedissonRemoteService implements RRemoteService { } }; - return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); + return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); } private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, - RBlockingQueue responseQueue, String ackName) throws InterruptedException { - Future ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE, - "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[1]);" - + "return 0;" - + "end;" - + "redis.call('del', KEYS[1]);" - + "return 1;", - RScript.ReturnType.BOOLEAN, Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis()); - + RBlockingQueue responseQueue, String ackName) + throws InterruptedException { + Future ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('setnx', KEYS[1], 1) == 1 then " + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return 0;" + + "end;" + + "redis.call('del', KEYS[1]);" + + "return 1;", + Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); + ackClientsFuture.sync(); if (ackClientsFuture.getNow()) { return (RemoteServiceAck) responseQueue.poll(); } return null; } - + private Future tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy, - final RBlockingQueue responseQueue, String ackName) + final RBlockingQueue responseQueue, String ackName) throws InterruptedException { final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); - Future ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE, - "if redis.call('setnx', KEYS[1], 1) == 1 then " + Future ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[1]);" + "return 0;" - + "end;" - + "redis.call('del', KEYS[1]);" - + "return 1;", - RScript.ReturnType.BOOLEAN, Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); + + "end;" + + "redis.call('del', KEYS[1]);" + + "return 1;", + Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); ackClientsFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -592,9 +674,9 @@ public class RedissonRemoteService implements RRemoteService { promise.setFailure(future.cause()); return; } - + if (future.getNow()) { - Future pollFuture = (Future) responseQueue.pollAsync(); + Future pollFuture = responseQueue.pollAsync(); pollFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -602,7 +684,7 @@ public class RedissonRemoteService implements RRemoteService { promise.setFailure(future.cause()); return; } - + promise.setSuccess(future.getNow()); } }); @@ -629,9 +711,11 @@ public class RedissonRemoteService implements RRemoteService { return batch.executeAsync(); } - protected Future addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request, RemotePromise result) { + protected Future addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request, + RemotePromise result) { Future future = requestQueue.addAsync(request); result.setAddFuture(future); return future; } + } diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index 71a4a52cd..88c63e5c7 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -51,6 +51,6 @@ public interface RExecutorService extends ExecutorService { * * @param workers - workers amount */ - void registerWorkers(int workers, Executor executor); + void registerWorkers(int workers, ExecutorService executor); } diff --git a/redisson/src/main/java/org/redisson/api/RRemoteService.java b/redisson/src/main/java/org/redisson/api/RRemoteService.java index 06b17356a..3685e9663 100644 --- a/redisson/src/main/java/org/redisson/api/RRemoteService.java +++ b/redisson/src/main/java/org/redisson/api/RRemoteService.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** @@ -83,7 +84,7 @@ public interface RRemoteService { * @param object * @param workersAmount */ - void register(Class remoteInterface, T object, int workersAmount, Executor executor); + void register(Class remoteInterface, T object, int workersAmount, ExecutorService executor); /** * Get remote service object for remote invocations. diff --git a/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java b/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java index 0a90921c5..6fa3e48c1 100644 --- a/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java +++ b/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java @@ -82,7 +82,7 @@ public class RemoteInvocationOptions implements Serializable { public static RemoteInvocationOptions defaults() { return new RemoteInvocationOptions() .expectAckWithin(1, TimeUnit.SECONDS) - .expectResultWithin(20, TimeUnit.SECONDS); + .expectResultWithin(30, TimeUnit.SECONDS); } public Long getAckTimeoutInMillis() { diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 567729ca8..1fc08df97 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -62,10 +62,6 @@ public class RedisConnection implements RedisCommands { return (C) channel.attr(RedisConnection.CONNECTION).get(); } - public void removeCurrentCommand() { - channel.attr(CommandsQueue.CURRENT_COMMAND).remove(); - } - public CommandData getCurrentCommand() { QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get(); if (command instanceof CommandData) { diff --git a/redisson/src/main/java/org/redisson/client/RedisException.java b/redisson/src/main/java/org/redisson/client/RedisException.java index 7aedefbcd..c78529374 100644 --- a/redisson/src/main/java/org/redisson/client/RedisException.java +++ b/redisson/src/main/java/org/redisson/client/RedisException.java @@ -15,6 +15,11 @@ */ package org.redisson.client; +/** + * + * @author Nikita Koksharov + * + */ public class RedisException extends RuntimeException { private static final long serialVersionUID = 3389820652701696154L; diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index da78bea92..f08c70024 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -160,7 +160,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private void reattachBlockingQueue(RedisConnection connection, final CommandData commandData) { if (commandData == null - || !commandData.isBlockingCommand()) { + || !commandData.isBlockingCommand() + || commandData.getPromise().isDone()) { return; } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 6a08cbec3..6a16706c4 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -89,16 +89,26 @@ public class CommandAsyncService implements CommandAsyncExecutor { l.countDown(); } }); - try { - l.await(); - } catch (InterruptedException e) { + + boolean interrupted = false; + while (!future.isDone()) { + try { + l.await(); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { Thread.currentThread().interrupt(); } + // commented out due to blocking issues up to 200 ms per minute for each thread // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } + throw convertException(future); } @@ -509,9 +519,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getTimeout().cancel(); - int timeoutTime = connectionManager.getConfig().getTimeout(); + long timeoutTime = connectionManager.getConfig().getTimeout(); if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) { - Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); + Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); handleBlockingOperations(details, connection, popTimeout); if (popTimeout == 0) { return; @@ -521,7 +531,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { timeoutTime += 1000; } - final int timeoutAmount = timeoutTime; + final long timeoutAmount = timeoutTime; TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { @@ -535,7 +545,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setTimeout(timeout); } - private void handleBlockingOperations(final AsyncDetails details, final RedisConnection connection, Integer popTimeout) { + private void handleBlockingOperations(final AsyncDetails details, final RedisConnection connection, Long popTimeout) { final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -551,7 +561,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { scheduledFuture = connectionManager.getGroup().schedule(new Runnable() { @Override public void run() { - // there is no re-connection was made + // re-connection wasn't made // and connection is still active if (orignalChannel == connection.getChannel() && connection.isActive()) { @@ -590,17 +600,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }); - details.getAttemptPromise().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isCancelled()) { - // command should be removed due to - // ConnectionWatchdog blockingQueue reconnection logic - connection.removeCurrentCommand(); - } - } - }); - synchronized (listener) { if (!details.getMainPromise().isDone()) { connectionManager.getShutdownPromise().addListener(listener); @@ -717,7 +716,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } ((RedisClientResult)res).setRedisClient(addr); } - details.getMainPromise().setSuccess(res); + details.getMainPromise().trySuccess(res); } else { details.getMainPromise().tryFailure(future.cause()); } diff --git a/redisson/src/main/java/org/redisson/executor/RemotePromise.java b/redisson/src/main/java/org/redisson/executor/RemotePromise.java index fb0fc6e83..881d6419a 100644 --- a/redisson/src/main/java/org/redisson/executor/RemotePromise.java +++ b/redisson/src/main/java/org/redisson/executor/RemotePromise.java @@ -40,5 +40,9 @@ public class RemotePromise extends PromiseDelegator { public Future getAddFuture() { return addFuture; } + + public void doCancel() { + super.cancel(true); + } } diff --git a/redisson/src/main/java/org/redisson/remote/RRemoteServiceResponse.java b/redisson/src/main/java/org/redisson/remote/RRemoteServiceResponse.java index 44a72d231..006122146 100644 --- a/redisson/src/main/java/org/redisson/remote/RRemoteServiceResponse.java +++ b/redisson/src/main/java/org/redisson/remote/RRemoteServiceResponse.java @@ -17,6 +17,11 @@ package org.redisson.remote; import java.io.Serializable; +/** + * + * @author Nikita Koksharov + * + */ public interface RRemoteServiceResponse extends Serializable { } diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceAck.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceAck.java index da496efe6..0a3d1f518 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceAck.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceAck.java @@ -25,4 +25,6 @@ import java.io.Serializable; */ public class RemoteServiceAck implements RRemoteServiceResponse, Serializable { + private static final long serialVersionUID = -6332680404562746984L; + } diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceCancelRequest.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceCancelRequest.java new file mode 100644 index 000000000..1b7d2cf15 --- /dev/null +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceCancelRequest.java @@ -0,0 +1,43 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.remote; + +import java.io.Serializable; + +/** + * + * @author Nikita Koksharov + * + */ +public class RemoteServiceCancelRequest implements Serializable { + + private static final long serialVersionUID = -4800574267648904260L; + + private boolean mayInterruptIfRunning; + + public RemoteServiceCancelRequest() { + } + + public RemoteServiceCancelRequest(boolean mayInterruptIfRunning) { + super(); + this.mayInterruptIfRunning = mayInterruptIfRunning; + } + + public boolean isMayInterruptIfRunning() { + return mayInterruptIfRunning; + } + +} diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceCancelResponse.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceCancelResponse.java new file mode 100644 index 000000000..b861204bb --- /dev/null +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceCancelResponse.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.remote; + +import java.io.Serializable; + +/** + * + * @author Nikita Koksharov + * + */ +public class RemoteServiceCancelResponse implements RRemoteServiceResponse, Serializable { + + private static final long serialVersionUID = -4356901222132702182L; + +} diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceKey.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceKey.java index 995180076..9b1c4b83f 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceKey.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceKey.java @@ -15,6 +15,11 @@ */ package org.redisson.remote; +/** + * + * @author Nikita Koksharov + * + */ public class RemoteServiceKey { private final Class serviceInterface; diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceMethod.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceMethod.java index 3769990fa..4041aa772 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceMethod.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceMethod.java @@ -17,6 +17,11 @@ package org.redisson.remote; import java.lang.reflect.Method; +/** + * + * @author Nikita Koksharov + * + */ public class RemoteServiceMethod { private final Object bean; diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java index ceb672e4c..f9e2dc435 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java @@ -20,8 +20,15 @@ import java.util.Arrays; import org.redisson.api.RemoteInvocationOptions; +/** + * + * @author Nikita Koksharov + * + */ public class RemoteServiceRequest implements Serializable { + private static final long serialVersionUID = -1711385312384040075L; + private String requestId; private String methodName; private Object[] args; diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java index fb9aff160..9cdccb7df 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java @@ -17,8 +17,15 @@ package org.redisson.remote; import java.io.Serializable; +/** + * + * @author Nikita Koksharov + * + */ public class RemoteServiceResponse implements RRemoteServiceResponse, Serializable { + private static final long serialVersionUID = -1958922748139674253L; + private Object result; private Throwable error; diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 4c8b08b51..ceb83c3aa 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -69,6 +69,8 @@ public class RedissonRemoteServiceTest extends BaseTest { @RRemoteAsync(RemoteInterface.class) public interface RemoteInterfaceAsync { + Future cancelMethod(); + Future voidMethod(String name, Long param); Future resultMethod(Long value); @@ -102,6 +104,8 @@ public class RedissonRemoteServiceTest extends BaseTest { public interface RemoteInterface { + void cancelMethod() throws InterruptedException; + void voidMethod(String name, Long param); Long resultMethod(Long value); @@ -120,6 +124,27 @@ public class RedissonRemoteServiceTest extends BaseTest { public class RemoteImpl implements RemoteInterface { + private AtomicInteger iterations; + + public RemoteImpl() { + } + + public RemoteImpl(AtomicInteger iterations) { + super(); + this.iterations = iterations; + } + + @Override + public void cancelMethod() throws InterruptedException { + for (long i = 0; i < Long.MAX_VALUE; i++) { + iterations.incrementAndGet(); + if (Thread.interrupted()) { + System.out.println("interrupted! " + i); + return; + } + } + } + @Override public void voidMethod(String name, Long param) { System.out.println(name + " " + param); @@ -160,6 +185,31 @@ public class RedissonRemoteServiceTest extends BaseTest { } } + @Test + public void testCancelAsync() throws InterruptedException { + RedissonClient r1 = createInstance(); + AtomicInteger iterations = new AtomicInteger(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + r1.getKeys().flushall(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl(iterations), 1, executor); + + RedissonClient r2 = createInstance(); + RemoteInterfaceAsync ri = r2.getRemoteSerivce().get(RemoteInterfaceAsync.class); + + Future f = ri.cancelMethod(); + Thread.sleep(500); + assertThat(f.cancel(true)).isTrue(); + + executor.shutdown(); + r1.shutdown(); + r2.shutdown(); + + assertThat(iterations.get()).isLessThan(Integer.MAX_VALUE / 2); + + assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); + } + + @Test(expected = IllegalArgumentException.class) public void testWrongMethodAsync() throws InterruptedException { redisson.getRemoteSerivce().get(RemoteInterfaceWrongMethodAsync.class); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 0587d5561..f7a958985 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -59,7 +59,7 @@ public class RedissonExecutorServiceTest extends BaseTest { String invokeResult = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask())); assertThat(invokeResult).isEqualTo(CallableTask.RESULT); - String a = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()), 1, TimeUnit.SECONDS); + String a = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()), 5, TimeUnit.SECONDS); assertThat(a).isEqualTo(CallableTask.RESULT); List invokeAllParams = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()); @@ -70,7 +70,7 @@ public class RedissonExecutorServiceTest extends BaseTest { } List invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()); - List> allResult1 = e.invokeAll(invokeAllParams1, 1, TimeUnit.SECONDS); + List> allResult1 = e.invokeAll(invokeAllParams1, 5, TimeUnit.SECONDS); assertThat(allResult1).hasSize(invokeAllParams.size()); for (Future future : allResult1) { assertThat(future.get()).isEqualTo(CallableTask.RESULT); @@ -161,13 +161,13 @@ public class RedissonExecutorServiceTest extends BaseTest { @Test public void testResetShutdownState() throws InterruptedException, ExecutionException { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 10; i++) { RExecutorService e = redisson.getExecutorService("test"); e.execute(new RunnableTask()); assertThat(e.isShutdown()).isFalse(); e.shutdown(); assertThat(e.isShutdown()).isTrue(); - assertThat(e.awaitTermination(5, TimeUnit.SECONDS)).isTrue(); + assertThat(e.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); assertThat(e.isTerminated()).isTrue(); assertThat(e.delete()).isTrue(); assertThat(e.isShutdown()).isFalse();