diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 6a4f95e72..4812f1dbe 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -18,7 +18,6 @@ package org.redisson; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.Serializable; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -37,11 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.api.RAtomicLong; -import org.redisson.api.RBucket; import org.redisson.api.RExecutorService; import org.redisson.api.RFuture; -import org.redisson.api.RKeys; import org.redisson.api.RTopic; import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.annotation.RInject; @@ -76,10 +72,9 @@ public class RedissonExecutorService implements RExecutorService { private final Codec codec; private final Redisson redisson; - private final RAtomicLong tasksCounter; - private final RBucket status; + private final String tasksCounterName; + private final String statusName; private final RTopic topic; - private final RKeys keys; private final RemoteExecutorServiceAsync asyncService; private final RemoteExecutorServiceAsync asyncServiceWithoutResult; @@ -99,14 +94,13 @@ public class RedissonExecutorService implements RExecutorService { requestQueueName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}"; String objectName = requestQueueName; - tasksCounter = redisson.getAtomicLong(objectName + ":counter"); - status = redisson.getBucket(objectName + ":status", codec); + tasksCounterName = objectName + ":counter"; + statusName = objectName + ":status"; topic = redisson.getTopic(objectName + ":topic", codec); - keys = redisson.getKeys(); ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor); - remoteService.setTasksCounterName(tasksCounter.getName()); - remoteService.setStatusName(status.getName()); + remoteService.setTasksCounterName(tasksCounterName); + remoteService.setStatusName(statusName); asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(Integer.MAX_VALUE * 2)); asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); @@ -120,8 +114,8 @@ public class RedissonExecutorService implements RExecutorService { @Override public void registerWorkers(int executors, ExecutorService executor) { RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName); - service.setStatusName(status.getName()); - service.setTasksCounterName(tasksCounter.getName()); + service.setStatusName(statusName); + service.setTasksCounterName(tasksCounterName); service.setTopicName(topic.getChannelNames().get(0)); redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class, service, executors, executor); @@ -190,7 +184,7 @@ public class RedissonExecutorService implements RExecutorService { + "redis.call('set', KEYS[2], ARGV[1]);" + "end;" + "end;", - Arrays.asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)), + Arrays.asList(tasksCounterName, statusName, topic.getChannelNames().get(0)), SHUTDOWN_STATE, TERMINATED_STATE); } @@ -207,7 +201,7 @@ public class RedissonExecutorService implements RExecutorService { @Override public RFuture deleteAsync() { final RPromise result = connectionManager.newPromise(); - RFuture deleteFuture = keys.deleteAsync(requestQueueName, status.getName(), tasksCounter.getName()); + RFuture deleteFuture = redisson.getKeys().deleteAsync(requestQueueName, statusName, tasksCounterName); deleteFuture.addListener(new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { @@ -229,12 +223,22 @@ public class RedissonExecutorService implements RExecutorService { @Override public boolean isShutdown() { - return status.isExists() && status.get() >= SHUTDOWN_STATE; + return checkState(SHUTDOWN_STATE); + } + + private boolean checkState(int state) { + return commandExecutor.evalWrite(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, + "if redis.call('exists', KEYS[1]) == 1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then " + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(statusName), + SHUTDOWN_STATE); } @Override public boolean isTerminated() { - return status.isExists() && status.get() == TERMINATED_STATE; + return checkState(TERMINATED_STATE); } @Override @@ -302,9 +306,6 @@ public class RedissonExecutorService implements RExecutorService { if (task.getClass().isAnonymousClass()) { throw new IllegalArgumentException("Task can't be created using anonymous class"); } - if (!Serializable.class.isAssignableFrom(task.getClass())) { - throw new IllegalArgumentException("Task class should implement Serializable interface"); - } } private void execute(RemotePromise promise) { diff --git a/redisson/src/test/java/org/redisson/executor/CallableTask.java b/redisson/src/test/java/org/redisson/executor/CallableTask.java index e230d4edd..d0c25ce42 100644 --- a/redisson/src/test/java/org/redisson/executor/CallableTask.java +++ b/redisson/src/test/java/org/redisson/executor/CallableTask.java @@ -3,7 +3,7 @@ package org.redisson.executor; import java.io.Serializable; import java.util.concurrent.Callable; -public class CallableTask implements Callable, Serializable { +public class CallableTask implements Callable { public static final String RESULT = "callable"; diff --git a/redisson/src/test/java/org/redisson/executor/RunnableTask.java b/redisson/src/test/java/org/redisson/executor/RunnableTask.java index 773da2372..5ebbf0a49 100644 --- a/redisson/src/test/java/org/redisson/executor/RunnableTask.java +++ b/redisson/src/test/java/org/redisson/executor/RunnableTask.java @@ -2,7 +2,7 @@ package org.redisson.executor; import java.io.Serializable; -public class RunnableTask implements Runnable, Serializable { +public class RunnableTask implements Runnable { private static final long serialVersionUID = 2105094575950438867L; diff --git a/redisson/src/test/java/org/redisson/executor/RunnableTask2.java b/redisson/src/test/java/org/redisson/executor/RunnableTask2.java index 16a692161..ceb0eb444 100644 --- a/redisson/src/test/java/org/redisson/executor/RunnableTask2.java +++ b/redisson/src/test/java/org/redisson/executor/RunnableTask2.java @@ -2,7 +2,7 @@ package org.redisson.executor; import java.io.Serializable; -public class RunnableTask2 implements Runnable, Serializable { +public class RunnableTask2 implements Runnable { private static final long serialVersionUID = 2105094575950438867L;