diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 8b2853250..781cbe28f 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -33,7 +33,7 @@ import org.redisson.api.annotation.RRemoteAsync; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -65,21 +65,21 @@ public abstract class BaseRemoteService { protected final Codec codec; protected final RedissonClient redisson; protected final String name; - protected final CommandExecutor commandExecutor; + protected final CommandAsyncExecutor commandExecutor; - public BaseRemoteService(RedissonClient redisson, CommandExecutor commandExecutor) { + public BaseRemoteService(RedissonClient redisson, CommandAsyncExecutor commandExecutor) { this(redisson, "redisson_rs", commandExecutor); } - public BaseRemoteService(RedissonClient redisson, String name, CommandExecutor commandExecutor) { + public BaseRemoteService(RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor) { this(null, redisson, name, commandExecutor); } - public BaseRemoteService(Codec codec, RedissonClient redisson, CommandExecutor commandExecutor) { + public BaseRemoteService(Codec codec, RedissonClient redisson, CommandAsyncExecutor commandExecutor) { this(codec, redisson, "redisson_rs", commandExecutor); } - public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) { + public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor) { this.codec = codec; this.redisson = redisson; this.name = name; @@ -224,7 +224,7 @@ public abstract class BaseRemoteService { return cancel(syncInterface, requestId, request, mayInterruptIfRunning); } - boolean removed = remove(requestQueue, request); + boolean removed = commandExecutor.get(removeAsync(requestQueue, request)); if (removed) { super.cancel(mayInterruptIfRunning); return true; @@ -522,8 +522,8 @@ public abstract class BaseRemoteService { return future; } - protected boolean remove(RBlockingQueue requestQueue, RemoteServiceRequest request) { - return requestQueue.remove(request); + protected RFuture removeAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + return requestQueue.removeAsync(request); } private void cancelExecution(RemoteInvocationOptions optionsCopy, String responseName, diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 07599064f..d58906cb0 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.CronSchedule; import org.redisson.api.RAtomicLong; +import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; import org.redisson.api.RFuture; import org.redisson.api.RRemoteService; @@ -53,14 +54,16 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.connection.ConnectionManager; -import org.redisson.executor.ExecutorRemoteService; +import org.redisson.executor.RedissonExecutorBatchFuture; import org.redisson.executor.RedissonExecutorFuture; import org.redisson.executor.RedissonScheduledFuture; import org.redisson.executor.RemoteExecutorService; import org.redisson.executor.RemoteExecutorServiceAsync; import org.redisson.executor.RemoteExecutorServiceImpl; import org.redisson.executor.RemotePromise; -import org.redisson.executor.ScheduledExecutorRemoteService; +import org.redisson.executor.ScheduledTasksService; +import org.redisson.executor.TasksBatchService; +import org.redisson.executor.TasksService; import org.redisson.misc.Injector; import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; @@ -110,8 +113,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final RemoteExecutorServiceAsync asyncService; private final RemoteExecutorServiceAsync asyncServiceWithoutResult; - private final ScheduledExecutorRemoteService scheduledRemoteService; - private final ExecutorRemoteService executorRemoteService; + private final ScheduledTasksService scheduledRemoteService; private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); @@ -144,7 +146,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { remoteService = redisson.getRemoteService(name, codec); workersTopic = redisson.getTopic(workersChannelName); - executorRemoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor); + TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); @@ -152,7 +154,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); - scheduledRemoteService = new ScheduledExecutorRemoteService(codec, redisson, name, commandExecutor); + scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor); scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); scheduledRemoteService.setTasksCounterName(tasksCounterName); scheduledRemoteService.setStatusName(statusName); @@ -248,6 +250,36 @@ public class RedissonExecutorService implements RScheduledExecutorService { execute(promise); } + @Override + public void execute(Runnable ...tasks) { + if (tasks.length == 0) { + throw new NullPointerException("Tasks are not defined"); + } + + TasksBatchService executorRemoteService = createBatchService(); + RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); + for (Runnable task : tasks) { + check(task); + byte[] classBody = getClassBody(task); + byte[] state = encode(task); + asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state); + } + + List result = (List) executorRemoteService.executeAdd(); + if (!result.get(0)) { + throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); + } + } + + private TasksBatchService createBatchService() { + TasksBatchService executorRemoteService = new TasksBatchService(codec, redisson, name, commandExecutor); + executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); + executorRemoteService.setTasksCounterName(tasksCounterName); + executorRemoteService.setStatusName(statusName); + executorRemoteService.setTasksName(tasksName); + return executorRemoteService; + } + private byte[] encode(Object task) { // erase RedissonClient field to avoid its serialization Injector.inject(task, null); @@ -395,6 +427,77 @@ public class RedissonExecutorService implements RScheduledExecutorService { addListener(result); return new RedissonExecutorFuture(result, result.getRequestId()); } + + @Override + public RExecutorBatchFuture submit(Callable ...tasks) { + if (tasks.length == 0) { + throw new NullPointerException("Tasks are not defined"); + } + + List> result = new ArrayList>(); + TasksBatchService executorRemoteService = createBatchService(); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + for (Callable task : tasks) { + check(task); + byte[] classBody = getClassBody(task); + byte[] state = encode(task); + RemotePromise promise = (RemotePromise)asyncService.executeCallable(task.getClass().getName(), classBody, state); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + result.add(executorFuture); + } + + List addResult = (List) executorRemoteService.executeAdd(); + if (!addResult.get(0)) { + throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); + } + + return new RedissonExecutorBatchFuture(result); + } + + @Override + public RExecutorBatchFuture submitAsync(Callable ...tasks) { + if (tasks.length == 0) { + throw new NullPointerException("Tasks are not defined"); + } + + TasksBatchService executorRemoteService = createBatchService(); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + final List> result = new ArrayList>(); + for (Callable task : tasks) { + check(task); + byte[] classBody = getClassBody(task); + byte[] state = encode(task); + RemotePromise promise = (RemotePromise)asyncService.executeCallable(task.getClass().getName(), classBody, state); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + result.add(executorFuture); + } + + executorRemoteService.executeAddAsync().addListener(new FutureListener>() { + + @Override + public void operationComplete(io.netty.util.concurrent.Future> future) throws Exception { + if (!future.isSuccess()) { + for (RExecutorFuture executorFuture : result) { + ((RPromise)executorFuture).tryFailure(future.cause()); + } + return; + } + + for (Boolean bool : future.getNow()) { + if (!bool) { + RejectedExecutionException ex = new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"); + for (RExecutorFuture executorFuture : result) { + ((RPromise)executorFuture).tryFailure(ex); + } + break; + } + } + } + }); + + return new RedissonExecutorBatchFuture(result); + } + private void addListener(final RemotePromise result) { result.getAddFuture().addListener(new FutureListener() { @@ -453,6 +556,77 @@ public class RedissonExecutorService implements RScheduledExecutorService { return new RedissonExecutorFuture(resultFuture, future.getRequestId()); } + @Override + public RExecutorBatchFuture submit(Runnable ...tasks) { + if (tasks.length == 0) { + throw new NullPointerException("Tasks are not defined"); + } + + List> result = new ArrayList>(); + TasksBatchService executorRemoteService = createBatchService(); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + for (Runnable task : tasks) { + check(task); + byte[] classBody = getClassBody(task); + byte[] state = encode(task); + RemotePromise promise = (RemotePromise)asyncService.executeRunnable(task.getClass().getName(), classBody, state); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + result.add(executorFuture); + } + + List addResult = (List) executorRemoteService.executeAdd(); + if (!addResult.get(0)) { + throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); + } + + return new RedissonExecutorBatchFuture(result); + } + + @Override + public RExecutorBatchFuture submitAsync(Runnable ...tasks) { + if (tasks.length == 0) { + throw new NullPointerException("Tasks are not defined"); + } + + TasksBatchService executorRemoteService = createBatchService(); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + final List> result = new ArrayList>(); + for (Runnable task : tasks) { + check(task); + byte[] classBody = getClassBody(task); + byte[] state = encode(task); + RemotePromise promise = (RemotePromise)asyncService.executeRunnable(task.getClass().getName(), classBody, state); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + result.add(executorFuture); + } + + executorRemoteService.executeAddAsync().addListener(new FutureListener>() { + + @Override + public void operationComplete(io.netty.util.concurrent.Future> future) throws Exception { + if (!future.isSuccess()) { + for (RExecutorFuture executorFuture : result) { + ((RPromise)executorFuture).tryFailure(future.cause()); + } + return; + } + + for (Boolean bool : future.getNow()) { + if (!bool) { + RejectedExecutionException ex = new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"); + for (RExecutorFuture executorFuture : result) { + ((RPromise)executorFuture).tryFailure(ex); + } + break; + } + } + } + }); + + return new RedissonExecutorBatchFuture(result); + } + + @Override public RExecutorFuture submit(Runnable task) { RemotePromise promise = (RemotePromise) ((PromiseDelegator) submitAsync(task)).getInnerPromise(); diff --git a/redisson/src/main/java/org/redisson/api/RExecutorBatchFuture.java b/redisson/src/main/java/org/redisson/api/RExecutorBatchFuture.java new file mode 100644 index 000000000..6543033ff --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RExecutorBatchFuture.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.List; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RExecutorBatchFuture extends RFuture { + + List> getTaskFutures(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index cc437e756..dae7388bb 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -32,7 +32,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync String MAPREDUCE_NAME = "redisson_mapreduce"; /** - * Submits a value-returning task for execution and returns a + * Submits a value-returning task for execution synchronously and returns a * Future representing the pending results of the task. The * Future's {@code get} method will return the task's result upon * successful completion. @@ -44,6 +44,16 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync @Override RExecutorFuture submit(Callable task); + /** + * Submits tasks batch for execution synchronously. + * All tasks are stored to executor request queue atomically, + * if case of any error none of tasks will be added. + * + * @param tasks - tasks to execute + * @return Future object + */ + RExecutorBatchFuture submit(Callable ...tasks); + /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's {@code get} method will @@ -68,6 +78,16 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync @Override RExecutorFuture submit(Runnable task); + /** + * Submits tasks batch for execution synchronously. + * All tasks are stored to executor request queue atomically, + * if case of any error none of tasks will be added. + * + * @param tasks - tasks to execute + * @return Future object + */ + RExecutorBatchFuture submit(Runnable ...tasks); + /** * Returns executor name * @@ -113,5 +133,14 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync * @return true if task has been canceled successfully */ boolean cancelTask(String taskId); + + /** + * Submits tasks batch for execution synchronously. + * All tasks are stored to executor request queue atomically, + * if case of any error none of tasks will be added. + * + * @param tasks - tasks to execute + */ + void execute(Runnable ...tasks); } diff --git a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java index 10c55edd0..7e6853a4a 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java @@ -33,7 +33,7 @@ public interface RExecutorServiceAsync { RFuture deleteAsync(); /** - * Use {@link RExecutorService#submit(Callable)} + * Submits task for execution asynchronously * * @param type of return value * @param task - task to execute @@ -42,11 +42,29 @@ public interface RExecutorServiceAsync { RExecutorFuture submitAsync(Callable task); /** - * Use {@link RExecutorService#submit(Runnable)} + * Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically, + * if case of any error none of tasks will be added. + * + * @param tasks - tasks to execute + * @return Future object + */ + RExecutorBatchFuture submitAsync(Callable ...tasks); + + /** + * Submits task for execution asynchronously * * @param task - task to execute * @return Future object */ RExecutorFuture submitAsync(Runnable task); + /** + * Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically, + * if case of any error none of tasks will be added. + * + * @param tasks - tasks to execute + * @return Future object + */ + RExecutorBatchFuture submitAsync(Runnable ...tasks); + } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java new file mode 100644 index 000000000..b3f04bc4a --- /dev/null +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java @@ -0,0 +1,63 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.redisson.api.RExecutorBatchFuture; +import org.redisson.api.RExecutorFuture; +import org.redisson.misc.RedissonPromise; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonExecutorBatchFuture extends RedissonPromise implements RExecutorBatchFuture { + + private List> futures; + + public RedissonExecutorBatchFuture(List> futures) { + this.futures = futures; + + final AtomicInteger counter = new AtomicInteger(futures.size()); + for (RExecutorFuture future : futures) { + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + RedissonExecutorBatchFuture.this.tryFailure(future.cause()); + return; + } + + if (counter.decrementAndGet() == 0) { + RedissonExecutorBatchFuture.this.trySuccess(null); + } + } + }); + } + } + + @Override + public List> getTaskFutures() { + return futures; + } + +} diff --git a/redisson/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java b/redisson/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java index 9199e6809..def8af89b 100644 --- a/redisson/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java +++ b/redisson/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java @@ -128,7 +128,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService, RemoteP * @return */ private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed() { - ScheduledExecutorRemoteService scheduledRemoteService = new ScheduledExecutorRemoteService(codec, redisson, name, commandExecutor); + ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor); scheduledRemoteService.setTerminationTopicName(terminationTopicName); scheduledRemoteService.setTasksCounterName(tasksCounterName); scheduledRemoteService.setStatusName(statusName); diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java similarity index 93% rename from redisson/src/main/java/org/redisson/executor/ScheduledExecutorRemoteService.java rename to redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 2dadd653d..591cc8fa6 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -37,13 +37,13 @@ import io.netty.util.TimerTask; * @author Nikita Koksharov * */ -public class ScheduledExecutorRemoteService extends ExecutorRemoteService { +public class ScheduledTasksService extends TasksService { private String requestId; private String schedulerQueueName; private String schedulerChannelName; - public ScheduledExecutorRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) { + public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) { super(codec, redisson, name, commandExecutor); } @@ -119,7 +119,7 @@ public class ScheduledExecutorRemoteService extends ExecutorRemoteService { commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - ScheduledExecutorRemoteService.super.awaitResultAsync(optionsCopy, result, request, responseName); + ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseName); } }, delay, TimeUnit.MILLISECONDS); } else { @@ -128,8 +128,8 @@ public class ScheduledExecutorRemoteService extends ExecutorRemoteService { } @Override - protected boolean remove(RBlockingQueue requestQueue, RemoteServiceRequest request) { - return commandExecutor.evalWrite(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + protected RFuture removeAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove from scheduler queue "if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then " + "redis.call('hdel', KEYS[6], ARGV[1]); " diff --git a/redisson/src/main/java/org/redisson/executor/TasksBatchService.java b/redisson/src/main/java/org/redisson/executor/TasksBatchService.java new file mode 100644 index 000000000..806b9e3a7 --- /dev/null +++ b/redisson/src/main/java/org/redisson/executor/TasksBatchService.java @@ -0,0 +1,55 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.List; + +import org.redisson.api.RFuture; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandBatchService; +import org.redisson.command.CommandExecutor; + +/** + * + * @author Nikita Koksharov + * + */ +public class TasksBatchService extends TasksService { + + private CommandBatchService batchCommandService; + + public TasksBatchService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) { + super(codec, redisson, name, commandExecutor); + batchCommandService = new CommandBatchService(commandExecutor.getConnectionManager()); + } + + @Override + protected CommandAsyncExecutor getAddCommandExecutor() { + return batchCommandService; + } + + public List executeAdd() { + return (List) batchCommandService.execute(); + } + + public RFuture> executeAddAsync() { + return (RFuture>)(Object)batchCommandService.executeAsync(); + } + + +} diff --git a/redisson/src/main/java/org/redisson/executor/ExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java similarity index 67% rename from redisson/src/main/java/org/redisson/executor/ExecutorRemoteService.java rename to redisson/src/main/java/org/redisson/executor/TasksService.java index 9c6d02458..e076f42b6 100644 --- a/redisson/src/main/java/org/redisson/executor/ExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -26,7 +26,7 @@ import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.remote.RemoteServiceCancelRequest; @@ -41,14 +41,14 @@ import io.netty.util.concurrent.FutureListener; * @author Nikita Koksharov * */ -public class ExecutorRemoteService extends BaseRemoteService { +public class TasksService extends BaseRemoteService { protected String terminationTopicName; protected String tasksCounterName; protected String statusName; protected String tasksName; - public ExecutorRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) { + public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor) { super(codec, redisson, name, commandExecutor); } @@ -95,8 +95,12 @@ public class ExecutorRemoteService extends BaseRemoteService { return promise; } + protected CommandAsyncExecutor getAddCommandExecutor() { + return commandExecutor; + } + protected RFuture addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[2]) == 0 then " + "redis.call('rpush', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);" @@ -109,8 +113,8 @@ public class ExecutorRemoteService extends BaseRemoteService { } @Override - protected boolean remove(RBlockingQueue requestQueue, RemoteServiceRequest request) { - return commandExecutor.evalWrite(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + protected RFuture removeAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local task = redis.call('hget', KEYS[5], ARGV[1]); " + "if task ~= false and redis.call('lrem', KEYS[1], 1, task) > 0 then " + "redis.call('hdel', KEYS[5], ARGV[1]); " @@ -129,45 +133,59 @@ public class ExecutorRemoteService extends BaseRemoteService { request.getRequestId(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } - public RFuture cancelExecutionAsync(String requestId) { - Class syncInterface = RemoteExecutorService.class; + public RFuture cancelExecutionAsync(final String requestId) { + final Class syncInterface = RemoteExecutorService.class; String requestQueueName = getRequestQueueName(syncInterface); - String cancelRequestName = getCancelRequestQueueName(syncInterface, requestId); if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) { return RedissonPromise.newSucceededFuture(false); } + + final RPromise result = new RedissonPromise(); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); RemoteServiceRequest request = new RemoteServiceRequest(requestId); - if (remove(requestQueue, request)) { - return RedissonPromise.newSucceededFuture(true); - } - - RBlockingQueue cancelRequestQueue = redisson.getBlockingQueue(cancelRequestName, getCodec()); - cancelRequestQueue.putAsync(new RemoteServiceCancelRequest(true, requestId + ":cancel-response")); - cancelRequestQueue.expireAsync(60, TimeUnit.SECONDS); - - final RPromise result = new RedissonPromise(); - String responseQueueName = getResponseQueueName(syncInterface, requestId + ":cancel-response"); - RBlockingQueue responseQueue = redisson.getBlockingQueue(responseQueueName, getCodec()); - final RFuture response = responseQueue.pollAsync(60, TimeUnit.SECONDS); - response.addListener(new FutureListener() { + RFuture removeFuture = removeAsync(requestQueue, request); + removeFuture.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } - if (response.getNow() == null) { - result.trySuccess(false); - return; + if (future.getNow()) { + result.trySuccess(true); + } else { + String cancelRequestName = getCancelRequestQueueName(syncInterface, requestId); + + RBlockingQueue cancelRequestQueue = redisson.getBlockingQueue(cancelRequestName, getCodec()); + cancelRequestQueue.putAsync(new RemoteServiceCancelRequest(true, requestId + ":cancel-response")); + cancelRequestQueue.expireAsync(60, TimeUnit.SECONDS); + + String responseQueueName = getResponseQueueName(syncInterface, requestId + ":cancel-response"); + RBlockingQueue responseQueue = redisson.getBlockingQueue(responseQueueName, getCodec()); + final RFuture response = responseQueue.pollAsync(60, TimeUnit.SECONDS); + response.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + if (response.getNow() == null) { + result.trySuccess(false); + return; + } + result.trySuccess(response.getNow().isCanceled()); + } + }); } - result.trySuccess(response.getNow().isCanceled()); } }); + return result; } diff --git a/redisson/src/test/java/org/redisson/executor/IncrementCallableTask.java b/redisson/src/test/java/org/redisson/executor/IncrementCallableTask.java new file mode 100644 index 000000000..268c14eae --- /dev/null +++ b/redisson/src/test/java/org/redisson/executor/IncrementCallableTask.java @@ -0,0 +1,29 @@ +package org.redisson.executor; + +import java.util.concurrent.Callable; + +import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class IncrementCallableTask implements Callable { + + private String counterName; + + @RInject + private RedissonClient redisson; + + public IncrementCallableTask() { + } + + public IncrementCallableTask(String counterName) { + super(); + this.counterName = counterName; + } + + @Override + public String call() throws Exception { + redisson.getAtomicLong(counterName).incrementAndGet(); + return "1234"; + } + +} diff --git a/redisson/src/test/java/org/redisson/executor/IncrementRunnableTask.java b/redisson/src/test/java/org/redisson/executor/IncrementRunnableTask.java new file mode 100644 index 000000000..2469b068e --- /dev/null +++ b/redisson/src/test/java/org/redisson/executor/IncrementRunnableTask.java @@ -0,0 +1,26 @@ +package org.redisson.executor; + +import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class IncrementRunnableTask implements Runnable { + + private String counterName; + + @RInject + private RedissonClient redisson; + + public IncrementRunnableTask() { + } + + public IncrementRunnableTask(String counterName) { + super(); + this.counterName = counterName; + } + + @Override + public void run() { + redisson.getAtomicLong(counterName).incrementAndGet(); + } + +}