Ability to submit few tasks atomically (in batch) through RExecutorService interface #921

pull/709/merge
Nikita 8 years ago
parent 273cff0d17
commit 952bbb708e

@ -33,7 +33,7 @@ import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -65,21 +65,21 @@ public abstract class BaseRemoteService {
protected final Codec codec;
protected final RedissonClient redisson;
protected final String name;
protected final CommandExecutor commandExecutor;
protected final CommandAsyncExecutor commandExecutor;
public BaseRemoteService(RedissonClient redisson, CommandExecutor commandExecutor) {
public BaseRemoteService(RedissonClient redisson, CommandAsyncExecutor commandExecutor) {
this(redisson, "redisson_rs", commandExecutor);
}
public BaseRemoteService(RedissonClient redisson, String name, CommandExecutor commandExecutor) {
public BaseRemoteService(RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor) {
this(null, redisson, name, commandExecutor);
}
public BaseRemoteService(Codec codec, RedissonClient redisson, CommandExecutor commandExecutor) {
public BaseRemoteService(Codec codec, RedissonClient redisson, CommandAsyncExecutor commandExecutor) {
this(codec, redisson, "redisson_rs", commandExecutor);
}
public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) {
public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor) {
this.codec = codec;
this.redisson = redisson;
this.name = name;
@ -224,7 +224,7 @@ public abstract class BaseRemoteService {
return cancel(syncInterface, requestId, request, mayInterruptIfRunning);
}
boolean removed = remove(requestQueue, request);
boolean removed = commandExecutor.get(removeAsync(requestQueue, request));
if (removed) {
super.cancel(mayInterruptIfRunning);
return true;
@ -522,8 +522,8 @@ public abstract class BaseRemoteService {
return future;
}
protected boolean remove(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return requestQueue.remove(request);
protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return requestQueue.removeAsync(request);
}
private void cancelExecution(RemoteInvocationOptions optionsCopy, String responseName,

@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.CronSchedule;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
@ -53,14 +54,16 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.ExecutorRemoteService;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemoteExecutorServiceImpl;
import org.redisson.executor.RemotePromise;
import org.redisson.executor.ScheduledExecutorRemoteService;
import org.redisson.executor.ScheduledTasksService;
import org.redisson.executor.TasksBatchService;
import org.redisson.executor.TasksService;
import org.redisson.misc.Injector;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
@ -110,8 +113,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final RemoteExecutorServiceAsync asyncService;
private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
private final ScheduledExecutorRemoteService scheduledRemoteService;
private final ExecutorRemoteService executorRemoteService;
private final ScheduledTasksService scheduledRemoteService;
private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
@ -144,7 +146,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
remoteService = redisson.getRemoteService(name, codec);
workersTopic = redisson.getTopic(workersChannelName);
executorRemoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
@ -152,7 +154,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
scheduledRemoteService = new ScheduledExecutorRemoteService(codec, redisson, name, commandExecutor);
scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor);
scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);
@ -248,6 +250,36 @@ public class RedissonExecutorService implements RScheduledExecutorService {
execute(promise);
}
@Override
public void execute(Runnable ...tasks) {
if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined");
}
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state);
}
List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
if (!result.get(0)) {
throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
}
}
private TasksBatchService createBatchService() {
TasksBatchService executorRemoteService = new TasksBatchService(codec, redisson, name, commandExecutor);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
return executorRemoteService;
}
private byte[] encode(Object task) {
// erase RedissonClient field to avoid its serialization
Injector.inject(task, null);
@ -395,6 +427,77 @@ public class RedissonExecutorService implements RScheduledExecutorService {
addListener(result);
return new RedissonExecutorFuture<T>(result, result.getRequestId());
}
@Override
public RExecutorBatchFuture submit(Callable<?> ...tasks) {
if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined");
}
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
for (Callable<?> task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId());
result.add(executorFuture);
}
List<Boolean> addResult = (List<Boolean>) executorRemoteService.executeAdd();
if (!addResult.get(0)) {
throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
}
return new RedissonExecutorBatchFuture(result);
}
@Override
public RExecutorBatchFuture submitAsync(Callable<?> ...tasks) {
if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined");
}
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Callable<?> task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId());
result.add(executorFuture);
}
executorRemoteService.executeAddAsync().addListener(new FutureListener<List<Boolean>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<List<Boolean>> future) throws Exception {
if (!future.isSuccess()) {
for (RExecutorFuture<?> executorFuture : result) {
((RPromise<Void>)executorFuture).tryFailure(future.cause());
}
return;
}
for (Boolean bool : future.getNow()) {
if (!bool) {
RejectedExecutionException ex = new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
for (RExecutorFuture<?> executorFuture : result) {
((RPromise<Void>)executorFuture).tryFailure(ex);
}
break;
}
}
}
});
return new RedissonExecutorBatchFuture(result);
}
private <T> void addListener(final RemotePromise<T> result) {
result.getAddFuture().addListener(new FutureListener<Boolean>() {
@ -453,6 +556,77 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return new RedissonExecutorFuture<T>(resultFuture, future.getRequestId());
}
@Override
public RExecutorBatchFuture submit(Runnable ...tasks) {
if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined");
}
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
result.add(executorFuture);
}
List<Boolean> addResult = (List<Boolean>) executorRemoteService.executeAdd();
if (!addResult.get(0)) {
throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
}
return new RedissonExecutorBatchFuture(result);
}
@Override
public RExecutorBatchFuture submitAsync(Runnable ...tasks) {
if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined");
}
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
result.add(executorFuture);
}
executorRemoteService.executeAddAsync().addListener(new FutureListener<List<Boolean>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<List<Boolean>> future) throws Exception {
if (!future.isSuccess()) {
for (RExecutorFuture<?> executorFuture : result) {
((RPromise<Void>)executorFuture).tryFailure(future.cause());
}
return;
}
for (Boolean bool : future.getNow()) {
if (!bool) {
RejectedExecutionException ex = new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
for (RExecutorFuture<?> executorFuture : result) {
((RPromise<Void>)executorFuture).tryFailure(ex);
}
break;
}
}
}
});
return new RedissonExecutorBatchFuture(result);
}
@Override
public RExecutorFuture<?> submit(Runnable task) {
RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task)).getInnerPromise();

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public interface RExecutorBatchFuture extends RFuture<Void> {
List<RExecutorFuture<?>> getTaskFutures();
}

@ -32,7 +32,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
String MAPREDUCE_NAME = "redisson_mapreduce";
/**
* Submits a value-returning task for execution and returns a
* Submits a value-returning task for execution synchronously and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
@ -44,6 +44,16 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
@Override
<T> RExecutorFuture<T> submit(Callable<T> task);
/**
* Submits tasks batch for execution synchronously.
* All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
* @param tasks - tasks to execute
* @return Future object
*/
RExecutorBatchFuture submit(Callable<?> ...tasks);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
@ -68,6 +78,16 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
@Override
RExecutorFuture<?> submit(Runnable task);
/**
* Submits tasks batch for execution synchronously.
* All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
* @param tasks - tasks to execute
* @return Future object
*/
RExecutorBatchFuture submit(Runnable ...tasks);
/**
* Returns executor name
*
@ -113,5 +133,14 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
* @return <code>true</code> if task has been canceled successfully
*/
boolean cancelTask(String taskId);
/**
* Submits tasks batch for execution synchronously.
* All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
* @param tasks - tasks to execute
*/
void execute(Runnable ...tasks);
}

@ -33,7 +33,7 @@ public interface RExecutorServiceAsync {
RFuture<Boolean> deleteAsync();
/**
* Use {@link RExecutorService#submit(Callable)}
* Submits task for execution asynchronously
*
* @param <T> type of return value
* @param task - task to execute
@ -42,11 +42,29 @@ public interface RExecutorServiceAsync {
<T> RExecutorFuture<T> submitAsync(Callable<T> task);
/**
* Use {@link RExecutorService#submit(Runnable)}
* Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
* @param tasks - tasks to execute
* @return Future object
*/
RExecutorBatchFuture submitAsync(Callable<?> ...tasks);
/**
* Submits task for execution asynchronously
*
* @param task - task to execute
* @return Future object
*/
RExecutorFuture<?> submitAsync(Runnable task);
/**
* Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
* @param tasks - tasks to execute
* @return Future object
*/
RExecutorBatchFuture submitAsync(Runnable ...tasks);
}

@ -0,0 +1,63 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonExecutorBatchFuture extends RedissonPromise<Void> implements RExecutorBatchFuture {
private List<RExecutorFuture<?>> futures;
public RedissonExecutorBatchFuture(List<RExecutorFuture<?>> futures) {
this.futures = futures;
final AtomicInteger counter = new AtomicInteger(futures.size());
for (RExecutorFuture<?> future : futures) {
future.<Object>addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
RedissonExecutorBatchFuture.this.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
RedissonExecutorBatchFuture.this.trySuccess(null);
}
}
});
}
}
@Override
public List<RExecutorFuture<?>> getTaskFutures() {
return futures;
}
}

@ -128,7 +128,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService, RemoteP
* @return
*/
private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed() {
ScheduledExecutorRemoteService scheduledRemoteService = new ScheduledExecutorRemoteService(codec, redisson, name, commandExecutor);
ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor);
scheduledRemoteService.setTerminationTopicName(terminationTopicName);
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);

@ -37,13 +37,13 @@ import io.netty.util.TimerTask;
* @author Nikita Koksharov
*
*/
public class ScheduledExecutorRemoteService extends ExecutorRemoteService {
public class ScheduledTasksService extends TasksService {
private String requestId;
private String schedulerQueueName;
private String schedulerChannelName;
public ScheduledExecutorRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) {
public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) {
super(codec, redisson, name, commandExecutor);
}
@ -119,7 +119,7 @@ public class ScheduledExecutorRemoteService extends ExecutorRemoteService {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ScheduledExecutorRemoteService.super.awaitResultAsync(optionsCopy, result, request, responseName);
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseName);
}
}, delay, TimeUnit.MILLISECONDS);
} else {
@ -128,8 +128,8 @@ public class ScheduledExecutorRemoteService extends ExecutorRemoteService {
}
@Override
protected boolean remove(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return commandExecutor.evalWrite(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove from scheduler queue
"if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "

@ -0,0 +1,55 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.List;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.command.CommandExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class TasksBatchService extends TasksService {
private CommandBatchService batchCommandService;
public TasksBatchService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) {
super(codec, redisson, name, commandExecutor);
batchCommandService = new CommandBatchService(commandExecutor.getConnectionManager());
}
@Override
protected CommandAsyncExecutor getAddCommandExecutor() {
return batchCommandService;
}
public List<Boolean> executeAdd() {
return (List<Boolean>) batchCommandService.execute();
}
public RFuture<List<Boolean>> executeAddAsync() {
return (RFuture<List<Boolean>>)(Object)batchCommandService.executeAsync();
}
}

@ -26,7 +26,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceCancelRequest;
@ -41,14 +41,14 @@ import io.netty.util.concurrent.FutureListener;
* @author Nikita Koksharov
*
*/
public class ExecutorRemoteService extends BaseRemoteService {
public class TasksService extends BaseRemoteService {
protected String terminationTopicName;
protected String tasksCounterName;
protected String statusName;
protected String tasksName;
public ExecutorRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) {
public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor) {
super(codec, redisson, name, commandExecutor);
}
@ -95,8 +95,12 @@ public class ExecutorRemoteService extends BaseRemoteService {
return promise;
}
protected CommandAsyncExecutor getAddCommandExecutor() {
return commandExecutor;
}
protected RFuture<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('rpush', KEYS[3], ARGV[2]); "
+ "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);"
@ -109,8 +113,8 @@ public class ExecutorRemoteService extends BaseRemoteService {
}
@Override
protected boolean remove(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return commandExecutor.evalWrite(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local task = redis.call('hget', KEYS[5], ARGV[1]); " +
"if task ~= false and redis.call('lrem', KEYS[1], 1, task) > 0 then "
+ "redis.call('hdel', KEYS[5], ARGV[1]); "
@ -129,45 +133,59 @@ public class ExecutorRemoteService extends BaseRemoteService {
request.getRequestId(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
public RFuture<Boolean> cancelExecutionAsync(String requestId) {
Class<?> syncInterface = RemoteExecutorService.class;
public RFuture<Boolean> cancelExecutionAsync(final String requestId) {
final Class<?> syncInterface = RemoteExecutorService.class;
String requestQueueName = getRequestQueueName(syncInterface);
String cancelRequestName = getCancelRequestQueueName(syncInterface, requestId);
if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) {
return RedissonPromise.newSucceededFuture(false);
}
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
RemoteServiceRequest request = new RemoteServiceRequest(requestId);
if (remove(requestQueue, request)) {
return RedissonPromise.newSucceededFuture(true);
}
RBlockingQueue<RemoteServiceCancelRequest> cancelRequestQueue = redisson.getBlockingQueue(cancelRequestName, getCodec());
cancelRequestQueue.putAsync(new RemoteServiceCancelRequest(true, requestId + ":cancel-response"));
cancelRequestQueue.expireAsync(60, TimeUnit.SECONDS);
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
String responseQueueName = getResponseQueueName(syncInterface, requestId + ":cancel-response");
RBlockingQueue<RemoteServiceCancelResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, getCodec());
final RFuture<RemoteServiceCancelResponse> response = responseQueue.pollAsync(60, TimeUnit.SECONDS);
response.addListener(new FutureListener<RemoteServiceCancelResponse>() {
RFuture<Boolean> removeFuture = removeAsync(requestQueue, request);
removeFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<RemoteServiceCancelResponse> future) throws Exception {
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (response.getNow() == null) {
result.trySuccess(false);
return;
if (future.getNow()) {
result.trySuccess(true);
} else {
String cancelRequestName = getCancelRequestQueueName(syncInterface, requestId);
RBlockingQueue<RemoteServiceCancelRequest> cancelRequestQueue = redisson.getBlockingQueue(cancelRequestName, getCodec());
cancelRequestQueue.putAsync(new RemoteServiceCancelRequest(true, requestId + ":cancel-response"));
cancelRequestQueue.expireAsync(60, TimeUnit.SECONDS);
String responseQueueName = getResponseQueueName(syncInterface, requestId + ":cancel-response");
RBlockingQueue<RemoteServiceCancelResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, getCodec());
final RFuture<RemoteServiceCancelResponse> response = responseQueue.pollAsync(60, TimeUnit.SECONDS);
response.addListener(new FutureListener<RemoteServiceCancelResponse>() {
@Override
public void operationComplete(Future<RemoteServiceCancelResponse> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (response.getNow() == null) {
result.trySuccess(false);
return;
}
result.trySuccess(response.getNow().isCanceled());
}
});
}
result.trySuccess(response.getNow().isCanceled());
}
});
return result;
}

@ -0,0 +1,29 @@
package org.redisson.executor;
import java.util.concurrent.Callable;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
public class IncrementCallableTask implements Callable<String> {
private String counterName;
@RInject
private RedissonClient redisson;
public IncrementCallableTask() {
}
public IncrementCallableTask(String counterName) {
super();
this.counterName = counterName;
}
@Override
public String call() throws Exception {
redisson.getAtomicLong(counterName).incrementAndGet();
return "1234";
}
}

@ -0,0 +1,26 @@
package org.redisson.executor;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
public class IncrementRunnableTask implements Runnable {
private String counterName;
@RInject
private RedissonClient redisson;
public IncrementRunnableTask() {
}
public IncrementRunnableTask(String counterName) {
super();
this.counterName = counterName;
}
@Override
public void run() {
redisson.getAtomicLong(counterName).incrementAndGet();
}
}
Loading…
Cancel
Save