ExecutorService code cleanup

pull/578/head^2
Nikita 9 years ago
parent e6c0a409e6
commit 72afcc431a

@ -18,7 +18,6 @@ package org.redisson;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@ -37,11 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBucket;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject;
@ -76,10 +72,9 @@ public class RedissonExecutorService implements RExecutorService {
private final Codec codec;
private final Redisson redisson;
private final RAtomicLong tasksCounter;
private final RBucket<Integer> status;
private final String tasksCounterName;
private final String statusName;
private final RTopic<Integer> topic;
private final RKeys keys;
private final RemoteExecutorServiceAsync asyncService;
private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
@ -99,14 +94,13 @@ public class RedissonExecutorService implements RExecutorService {
requestQueueName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}";
String objectName = requestQueueName;
tasksCounter = redisson.getAtomicLong(objectName + ":counter");
status = redisson.getBucket(objectName + ":status", codec);
tasksCounterName = objectName + ":counter";
statusName = objectName + ":status";
topic = redisson.getTopic(objectName + ":topic", codec);
keys = redisson.getKeys();
ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
remoteService.setTasksCounterName(tasksCounter.getName());
remoteService.setStatusName(status.getName());
remoteService.setTasksCounterName(tasksCounterName);
remoteService.setStatusName(statusName);
asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(Integer.MAX_VALUE * 2));
asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
@ -120,8 +114,8 @@ public class RedissonExecutorService implements RExecutorService {
@Override
public void registerWorkers(int executors, ExecutorService executor) {
RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName);
service.setStatusName(status.getName());
service.setTasksCounterName(tasksCounter.getName());
service.setStatusName(statusName);
service.setTasksCounterName(tasksCounterName);
service.setTopicName(topic.getChannelNames().get(0));
redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class, service, executors, executor);
@ -190,7 +184,7 @@ public class RedissonExecutorService implements RExecutorService {
+ "redis.call('set', KEYS[2], ARGV[1]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)),
Arrays.<Object>asList(tasksCounterName, statusName, topic.getChannelNames().get(0)),
SHUTDOWN_STATE, TERMINATED_STATE);
}
@ -207,7 +201,7 @@ public class RedissonExecutorService implements RExecutorService {
@Override
public RFuture<Boolean> deleteAsync() {
final RPromise<Boolean> result = connectionManager.newPromise();
RFuture<Long> deleteFuture = keys.deleteAsync(requestQueueName, status.getName(), tasksCounter.getName());
RFuture<Long> deleteFuture = redisson.getKeys().deleteAsync(requestQueueName, statusName, tasksCounterName);
deleteFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
@ -229,12 +223,22 @@ public class RedissonExecutorService implements RExecutorService {
@Override
public boolean isShutdown() {
return status.isExists() && status.get() >= SHUTDOWN_STATE;
return checkState(SHUTDOWN_STATE);
}
private boolean checkState(int state) {
return commandExecutor.evalWrite(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"if redis.call('exists', KEYS[1]) == 1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(statusName),
SHUTDOWN_STATE);
}
@Override
public boolean isTerminated() {
return status.isExists() && status.get() == TERMINATED_STATE;
return checkState(TERMINATED_STATE);
}
@Override
@ -302,9 +306,6 @@ public class RedissonExecutorService implements RExecutorService {
if (task.getClass().isAnonymousClass()) {
throw new IllegalArgumentException("Task can't be created using anonymous class");
}
if (!Serializable.class.isAssignableFrom(task.getClass())) {
throw new IllegalArgumentException("Task class should implement Serializable interface");
}
}
private <T> void execute(RemotePromise<T> promise) {

@ -3,7 +3,7 @@ package org.redisson.executor;
import java.io.Serializable;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<String>, Serializable {
public class CallableTask implements Callable<String> {
public static final String RESULT = "callable";

@ -2,7 +2,7 @@ package org.redisson.executor;
import java.io.Serializable;
public class RunnableTask implements Runnable, Serializable {
public class RunnableTask implements Runnable {
private static final long serialVersionUID = 2105094575950438867L;

@ -2,7 +2,7 @@ package org.redisson.executor;
import java.io.Serializable;
public class RunnableTask2 implements Runnable, Serializable {
public class RunnableTask2 implements Runnable {
private static final long serialVersionUID = 2105094575950438867L;

Loading…
Cancel
Save