From d6a397593dbf5cfed9b14d1641afd759eee2639f Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 2 Aug 2016 18:54:17 +0300 Subject: [PATCH] ExecutorService polishing. #208 --- .../org/redisson/RedissonExecutorService.java | 5 ++-- .../org/redisson/RedissonRemoteService.java | 9 +++--- .../executor/ExecutorRemoteService.java | 28 +++++++++++++++++-- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/redisson/RedissonExecutorService.java b/src/main/java/org/redisson/RedissonExecutorService.java index 33b1e32ea..d7dacc75e 100644 --- a/src/main/java/org/redisson/RedissonExecutorService.java +++ b/src/main/java/org/redisson/RedissonExecutorService.java @@ -144,7 +144,7 @@ public class RedissonExecutorService implements RExecutorService { public void shutdown() { commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "if redis.call('exists', KEYS[2]) == 0 then " - + "if redis.call('get', KEYS[1]) == 0 or redis.call('exists', KEYS[1]) == 0 then " + + "if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[2], ARGV[2]);" + "redis.call('publish', KEYS[3], ARGV[2]);" + "else " @@ -220,7 +220,8 @@ public class RedissonExecutorService implements RExecutorService { private void check(RemotePromise promise) { io.netty.util.concurrent.Future addFuture = promise.getAddFuture(); - Boolean res = addFuture.awaitUninterruptibly().getNow(); + addFuture.syncUninterruptibly(); + Boolean res = addFuture.getNow(); if (!res) { throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"); } diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 9d41566a3..e372ecfd2 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -336,8 +336,7 @@ public class RedissonRemoteService implements RRemoteService { } }; - Future addFuture = addAsync(requestQueue, request); - result.setAddFuture(addFuture); + Future addFuture = addAsync(requestQueue, request, result); addFuture.addListener(new FutureListener() { @Override @@ -598,7 +597,9 @@ public class RedissonRemoteService implements RRemoteService { return batch.executeAsync(); } - protected Future addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { - return requestQueue.addAsync(request); + protected Future addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request, RemotePromise result) { + Future future = requestQueue.addAsync(request); + result.setAddFuture(future); + return future; } } diff --git a/src/main/java/org/redisson/executor/ExecutorRemoteService.java b/src/main/java/org/redisson/executor/ExecutorRemoteService.java index 92c117586..1afe6e32a 100644 --- a/src/main/java/org/redisson/executor/ExecutorRemoteService.java +++ b/src/main/java/org/redisson/executor/ExecutorRemoteService.java @@ -28,6 +28,8 @@ import org.redisson.command.CommandExecutor; import org.redisson.remote.RemoteServiceRequest; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; public class ExecutorRemoteService extends RedissonRemoteService { @@ -44,8 +46,9 @@ public class ExecutorRemoteService extends RedissonRemoteService { @Override protected Future addAsync(RBlockingQueue requestQueue, - RemoteServiceRequest request) { - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + RemoteServiceRequest request, RemotePromise result) { + final Promise promise = commandExecutor.getConnectionManager().newPromise(); + Future future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[2]) == 0 then " + "redis.call('rpush', KEYS[3], ARGV[1]); " + "redis.call('incr', KEYS[1]);" @@ -54,6 +57,27 @@ public class ExecutorRemoteService extends RedissonRemoteService { + "return 0;", Arrays.asList(tasksCounter.getName(), status.getName(), requestQueue.getName()), encode(request)); + + result.setAddFuture(future); + + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + return; + } + + if (!future.getNow()) { + promise.cancel(true); + return; + } + + promise.setSuccess(true); + } + }); + + return promise; } }