ExecutorService polishing. #208

pull/574/merge
Nikita 9 years ago
parent a2e074b7dd
commit d6a397593d

@ -144,7 +144,7 @@ public class RedissonExecutorService implements RExecutorService {
public void shutdown() { public void shutdown() {
commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('exists', KEYS[2]) == 0 then " "if redis.call('exists', KEYS[2]) == 0 then "
+ "if redis.call('get', KEYS[1]) == 0 or redis.call('exists', KEYS[1]) == 0 then " + "if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[2], ARGV[2]);" + "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);" + "redis.call('publish', KEYS[3], ARGV[2]);"
+ "else " + "else "
@ -220,7 +220,8 @@ public class RedissonExecutorService implements RExecutorService {
private <T> void check(RemotePromise<T> promise) { private <T> void check(RemotePromise<T> promise) {
io.netty.util.concurrent.Future<Boolean> addFuture = promise.getAddFuture(); io.netty.util.concurrent.Future<Boolean> addFuture = promise.getAddFuture();
Boolean res = addFuture.awaitUninterruptibly().getNow(); addFuture.syncUninterruptibly();
Boolean res = addFuture.getNow();
if (!res) { if (!res) {
throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"); throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
} }

@ -336,8 +336,7 @@ public class RedissonRemoteService implements RRemoteService {
} }
}; };
Future<Boolean> addFuture = addAsync(requestQueue, request); Future<Boolean> addFuture = addAsync(requestQueue, request, result);
result.setAddFuture(addFuture);
addFuture.addListener(new FutureListener<Boolean>() { addFuture.addListener(new FutureListener<Boolean>() {
@Override @Override
@ -598,7 +597,9 @@ public class RedissonRemoteService implements RRemoteService {
return batch.executeAsync(); return batch.executeAsync();
} }
protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) { protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request, RemotePromise<Object> result) {
return requestQueue.addAsync(request); Future<Boolean> future = requestQueue.addAsync(request);
result.setAddFuture(future);
return future;
} }
} }

@ -28,6 +28,8 @@ import org.redisson.command.CommandExecutor;
import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RemoteServiceRequest;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class ExecutorRemoteService extends RedissonRemoteService { public class ExecutorRemoteService extends RedissonRemoteService {
@ -44,8 +46,9 @@ public class ExecutorRemoteService extends RedissonRemoteService {
@Override @Override
protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue,
RemoteServiceRequest request) { RemoteServiceRequest request, RemotePromise<Object> result) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, final Promise<Boolean> promise = commandExecutor.getConnectionManager().newPromise();
Future<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[2]) == 0 then " "if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('rpush', KEYS[3], ARGV[1]); " + "redis.call('rpush', KEYS[3], ARGV[1]); "
+ "redis.call('incr', KEYS[1]);" + "redis.call('incr', KEYS[1]);"
@ -54,6 +57,27 @@ public class ExecutorRemoteService extends RedissonRemoteService {
+ "return 0;", + "return 0;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), requestQueue.getName()), Arrays.<Object>asList(tasksCounter.getName(), status.getName(), requestQueue.getName()),
encode(request)); encode(request));
result.setAddFuture(future);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
if (!future.getNow()) {
promise.cancel(true);
return;
}
promise.setSuccess(true);
}
});
return promise;
} }
} }

Loading…
Cancel
Save