ExecutorService task failover implemented. #1291, #1120

pull/1461/head
Nikita 7 years ago
parent c169193bc6
commit 10f8839f29

@ -39,7 +39,9 @@ import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
@ -718,7 +720,7 @@ public abstract class BaseRemoteService {
return;
}
RMap<String, T> canceledRequests = redisson.getMap(mapName, codec);
RMap<String, T> canceledRequests = redisson.getMap(mapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
future.addListener(new FutureListener<T>() {
@Override
@ -744,9 +746,10 @@ public abstract class BaseRemoteService {
}
protected RequestId generateRequestId() {
byte[] id = new byte[16];
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
id[0] = 0;
return new RequestId(id);
}
@ -757,7 +760,7 @@ public abstract class BaseRemoteService {
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
@ -375,7 +376,12 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name) {
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses);
return getExecutorService(name, connectionManager.getCodec());
}
@Override
public RScheduledExecutorService getExecutorService(String name, ExecutorOptions options) {
return getExecutorService(name, connectionManager.getCodec(), options);
}
@Override
@ -386,7 +392,12 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses);
return getExecutorService(name, codec, ExecutorOptions.defaults());
}
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options);
}
@Override

@ -41,6 +41,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
@ -60,6 +61,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonExecutorRemoteService;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
@ -105,6 +107,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String tasksName;
private final String schedulerQueueName;
private final String schedulerChannelName;
private final String tasksRetryIntervalName;
private final String workersChannelName;
private final String workersSemaphoreName;
@ -123,6 +126,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
private final ScheduledTasksService scheduledRemoteService;
private final TasksService executorRemoteService;
private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
@ -136,7 +140,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<RExecutorFuture<?>>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(PlatformDependent.<RedissonExecutorFutureReference, Boolean>newConcurrentHashMap());
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses) {
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson,
String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, ExecutorOptions options) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
@ -152,7 +157,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = redisson.getRemoteService(name, codec);
remoteService = new RedissonExecutorRemoteService(codec, redisson, name, connectionManager.getCommandExecutor(), executorId, responses);
requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId);
String objectName = requestQueueName;
@ -161,7 +166,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
statusName = objectName + ":status";
terminationTopic = redisson.getTopic(objectName + ":termination-topic", codec);
tasksRetryIntervalName = objectName + ":retry-interval";
schedulerChannelName = objectName + ":scheduler-channel";
schedulerQueueName = objectName + ":scheduler";
@ -171,11 +176,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersTopic = redisson.getTopic(workersChannelName);
TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
executorRemoteService.setSchedulerChannelName(schedulerChannelName);
executorRemoteService.setSchedulerQueueName(schedulerQueueName);
executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
@ -186,6 +195,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
scheduledRemoteService.setSchedulerQueueName(schedulerQueueName);
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
}
@ -234,9 +245,32 @@ public class RedissonExecutorService implements RScheduledExecutorService {
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "local retryInterval = redis.call('get', KEYS[4]);"
+ "if #expiredTaskIds > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "if retryInterval ~= false then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "for i = 1, #expiredTaskIds, 1 do "
+ "local name = expiredTaskIds[i];"
+ "local scheduledName = expiredTaskIds[i];"
+ "if string.sub(scheduledName, 1, 2) ~= 'ff' then "
+ "scheduledName = 'ff' .. scheduledName; "
+ "else "
+ "name = string.sub(name, 3, string.len(name)); "
+ "end;"
+ "redis.call('zadd', KEYS[2], startTime, scheduledName);"
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == expiredTaskIds[i] then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"
+ "redis.call('rpush', KEYS[1], name);"
+ "end; "
+ "else "
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "end; "
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
@ -244,7 +278,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName),
System.currentTimeMillis(), 100);
}
};
@ -258,6 +292,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
service.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
service.setSchedulerChannelName(schedulerChannelName);
service.setSchedulerQueueName(schedulerQueueName);
service.setTasksRetryIntervalName(tasksRetryIntervalName);
remoteService.register(RemoteExecutorService.class, service, workers, executor);
workersGroupListenerId = workersTopic.addListener(new MessageListener<String>() {
@ -269,6 +304,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
});
}
private long repeatInterval = 5000;
@Override
public void execute(Runnable task) {
check(task);
@ -305,6 +342,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
executorRemoteService.setSchedulerChannelName(schedulerChannelName);
executorRemoteService.setSchedulerQueueName(schedulerQueueName);
executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
return executorRemoteService;
}
@ -363,7 +403,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "redis.call('set', KEYS[2], ARGV[1]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0)),
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0), tasksRetryIntervalName),
SHUTDOWN_STATE, TERMINATED_STATE);
}
@ -845,8 +885,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public boolean cancelTask(String taskId) {
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
if (taskId.startsWith("01")) {
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
return commandExecutor.get(scheduledFuture);
}
RFuture<Boolean> scheduledFuture = executorRemoteService.cancelExecutionAsync(new RequestId(taskId));
return commandExecutor.get(scheduledFuture);
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
@ -910,8 +955,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw ee;
} finally {
for (Future<T> f : futures)
for (Future<T> f : futures) {
f.cancel(true);
}
}
}

@ -187,7 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
final String requestId = future.getNow();
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<RemoteServiceRequest> taskFuture = tasks.removeAsync(requestId);
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
@ -330,7 +330,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// could be removed not from future object
if (future.getNow().isSendResponse()) {
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, codec);
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
map.putAsync(request.getId(), response);
map.expireAsync(60, TimeUnit.SECONDS);
}
@ -397,4 +397,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
protected RFuture<RemoteServiceRequest> getTask(final String requestId, RMap<String, RemoteServiceRequest> tasks) {
return tasks.removeAsync(requestId);
}
}

@ -0,0 +1,56 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Configuration for ExecutorService.
*
* @author Nikita Koksharov
*
*/
public class ExecutorOptions {
private long taskRetryInterval = 60000;
private ExecutorOptions() {
}
public static ExecutorOptions defaults() {
return new ExecutorOptions();
}
public long getTaskRetryInterval() {
return taskRetryInterval;
}
/**
* Defines task retry interval at the end of which task is executed again.
* ExecutorService worker re-schedule task execution retry every 5 seconds.
* <p>
* Default is <code>1 minute</code>
*
* @param timeout value
* @param unit value
* @return self instance
*/
public ExecutorOptions taskRetryInterval(long timeout, TimeUnit unit) {
this.taskRetryInterval = unit.toMillis(timeout);
return this;
}
}

@ -839,6 +839,15 @@ public interface RedissonClient {
*/
RScheduledExecutorService getExecutorService(String name);
/**
* Returns ScheduledExecutorService by name
*
* @param name - name of object
* @param options - options for executor
* @return ScheduledExecutorService object
*/
RScheduledExecutorService getExecutorService(String name, ExecutorOptions options);
/**
* Returns ScheduledExecutorService by name
* using provided codec for task, response and request serialization
@ -864,6 +873,17 @@ public interface RedissonClient {
* @since 2.8.2
*/
RScheduledExecutorService getExecutorService(String name, Codec codec);
/**
* Returns ScheduledExecutorService by name
* using provided codec for task, response and request serialization
*
* @param name - name of object
* @param codec - codec for task, response and request
* @param options - options for executor
* @return ScheduledExecutorService object
*/
RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonExecutorRemoteService extends RedissonRemoteService {
public RedissonExecutorRemoteService(Codec codec, RedissonClient redisson, String name,
CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
}
@Override
protected RFuture<RemoteServiceRequest> getTask(String requestId, RMap<String, RemoteServiceRequest> tasks) {
return tasks.getAsync(requestId);
}
}

@ -23,12 +23,15 @@ import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -37,8 +40,6 @@ import org.redisson.remote.ResponseEntry;
public class ScheduledTasksService extends TasksService {
private RequestId requestId;
private String schedulerQueueName;
private String schedulerChannelName;
public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, redissonId, responses);
@ -48,14 +49,6 @@ public class ScheduledTasksService extends TasksService {
this.requestId = requestId;
}
public void setSchedulerChannelName(String schedulerChannelName) {
this.schedulerChannelName = schedulerChannelName;
}
public void setSchedulerQueueName(String scheduledQueueName) {
this.schedulerQueueName = scheduledQueueName;
}
@Override
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
int requestIndex = 0;
@ -75,6 +68,16 @@ public class ScheduledTasksService extends TasksService {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "
+ "local retryInterval = redis.call('get', KEYS[6]); "
+ "if retryInterval ~= false then "
+ "local time = tonumber(ARGV[4]) + tonumber(retryInterval);"
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "elseif tonumber(ARGV[5]) > 0 then "
+ "redis.call('set', KEYS[6], ARGV[5]);"
+ "local time = tonumber(ARGV[4]) + tonumber(ARGV[5]);"
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "end; "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);"
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('incr', KEYS[1]);"
@ -87,31 +90,29 @@ public class ScheduledTasksService extends TasksService {
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName),
startTime, request.getId(), encode(request));
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName),
startTime, request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval);
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove from scheduler queue
"if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
+ "end;"
+ "end;"
"if redis.call('exists', KEYS[3]) == 0 then "
+ "return 1;"
+ "end;"
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
// remove from executor queue
+ "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); "
+ "local removedScheduled = redis.call('zrem', KEYS[2], ARGV[1]); "
+ "local removed = redis.call('lrem', KEYS[1], 1, ARGV[1]); "
// remove from executor queue
+ "if task ~= false and (removed > 0 or removedScheduled > 0) then "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);"
+ "redis.call('del', KEYS[3], KEYS[7]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
@ -119,17 +120,22 @@ public class ScheduledTasksService extends TasksService {
+ "end;"
+ "return 1;"
+ "end;"
// delete scheduled task
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "if task == false then "
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
@Override
protected RequestId generateRequestId() {
if (requestId == null) {
return super.generateRequestId();
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
id[0] = 1;
return new RequestId(id);
}
return requestId;
}

@ -20,7 +20,9 @@ import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.Redisson;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
@ -28,6 +30,8 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Injector;
@ -36,6 +40,10 @@ import org.redisson.remote.ResponseEntry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* Executor service runs Callable and Runnable tasks.
@ -60,6 +68,7 @@ public class TasksRunnerService implements RemoteExecutorService {
private String tasksName;
private String schedulerQueueName;
private String schedulerChannelName;
private String tasksRetryIntervalName;
private ConcurrentMap<String, ResponseEntry> responses;
public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap<String, ResponseEntry> responses) {
@ -76,6 +85,10 @@ public class TasksRunnerService implements RemoteExecutorService {
}
}
public void setTasksRetryIntervalName(String tasksRetryInterval) {
this.tasksRetryIntervalName = tasksRetryInterval;
}
public void setSchedulerQueueName(String schedulerQueueName) {
this.schedulerQueueName = schedulerQueueName;
}
@ -105,7 +118,7 @@ public class TasksRunnerService implements RemoteExecutorService {
long newStartTime = System.currentTimeMillis() + period;
RFuture<Void> future = asyncScheduledServiceAtFixed(executorId, requestId).scheduleAtFixedRate(className, classBody, state, newStartTime, period, executorId, requestId);
try {
executeRunnable(className, classBody, state, null);
executeRunnable(className, classBody, state, requestId);
} catch (RuntimeException e) {
// cancel task if it throws an exception
future.cancel(true);
@ -118,7 +131,7 @@ public class TasksRunnerService implements RemoteExecutorService {
Date nextStartDate = new CronExpression(cronExpression).getNextValidTimeAfter(new Date());
RFuture<Void> future = asyncScheduledServiceAtFixed(executorId, requestId).schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, executorId, requestId);
try {
executeRunnable(className, classBody, state, null);
executeRunnable(className, classBody, state, requestId);
} catch (RuntimeException e) {
// cancel task if it throws an exception
future.cancel(true);
@ -141,13 +154,14 @@ public class TasksRunnerService implements RemoteExecutorService {
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setRequestId(new RequestId(requestId));
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
return asyncScheduledServiceAtFixed;
}
@Override
public void scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId) {
executeRunnable(className, classBody, state, null);
executeRunnable(className, classBody, state, requestId);
long newStartTime = System.currentTimeMillis() + delay;
asyncScheduledServiceAtFixed(executorId, requestId).scheduleWithFixedDelay(className, classBody, state, newStartTime, delay, executorId, requestId);
}
@ -164,6 +178,8 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public Object executeCallable(String className, byte[] classBody, byte[] state, String requestId) {
renewRetryTime(requestId);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
try {
buf.writeBytes(state);
@ -187,6 +203,51 @@ public class TasksRunnerService implements RemoteExecutorService {
}
}
protected void scheduleRetryTimeRenewal(final String requestId) {
((Redisson)redisson).getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
renewRetryTime(requestId);
}
}, 5, TimeUnit.SECONDS);
}
protected void renewRetryTime(final String requestId) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"local name = ARGV[2];"
+ "local scheduledName = ARGV[2];"
+ "if string.sub(scheduledName, 1, 2) ~= 'ff' then "
+ "scheduledName = 'ff' .. scheduledName; "
+ "else "
+ "name = string.sub(name, 3, string.len(name)); "
+ "end;"
+ "local retryInterval = redis.call('get', KEYS[4]);"
+ "if redis.call('exists', KEYS[1]) == 0 and retryInterval ~= false and redis.call('hexists', KEYS[5], name) == 1 then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "redis.call('zadd', KEYS[2], startTime, scheduledName);"
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName),
System.currentTimeMillis(), requestId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess() || future.get()) {
scheduleRetryTimeRenewal(requestId);
}
}
});
}
@SuppressWarnings("unchecked")
private <T> T decode(ByteBuf buf) throws IOException {
@ -197,6 +258,10 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public void executeRunnable(String className, byte[] classBody, byte[] state, String requestId) {
if (requestId != null && requestId.startsWith("00")) {
renewRetryTime(requestId);
}
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
try {
buf.writeBytes(state);
@ -227,36 +292,26 @@ public class TasksRunnerService implements RemoteExecutorService {
* If <code>scheduledRequestId</code> is not null then
* delete scheduled task
*
* @param scheduledRequestId
* @param requestId
*/
private void finish(String scheduledRequestId) {
private void finish(String requestId) {
classLoader.clearCurrentClassLoader();
if (scheduledRequestId != null) {
commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID,
"redis.call('hdel', KEYS[4], ARGV[3]); " +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, scheduledRequestId);
return;
}
commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID,
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "end;" +
"redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1], KEYS[6]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);
}
}

@ -26,7 +26,9 @@ import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -50,11 +52,23 @@ public class TasksService extends BaseRemoteService {
protected String tasksCounterName;
protected String statusName;
protected String tasksName;
protected String schedulerQueueName;
protected String schedulerChannelName;
protected String tasksRetryIntervalName;
protected long tasksRetryInterval;
public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
}
public void setTasksRetryIntervalName(String tasksRetryIntervalName) {
this.tasksRetryIntervalName = tasksRetryIntervalName;
}
public void setTasksRetryInterval(long tasksRetryInterval) {
this.tasksRetryInterval = tasksRetryInterval;
}
public void setTerminationTopicName(String terminationTopicName) {
this.terminationTopicName = terminationTopicName;
}
@ -70,6 +84,14 @@ public class TasksService extends BaseRemoteService {
public void setTasksName(String tasksName) {
this.tasksName = tasksName;
}
public void setSchedulerChannelName(String schedulerChannelName) {
this.schedulerChannelName = schedulerChannelName;
}
public void setSchedulerQueueName(String scheduledQueueName) {
this.schedulerQueueName = scheduledQueueName;
}
@Override
protected final RFuture<Boolean> addAsync(String requestQueueName,
@ -104,37 +126,58 @@ public class TasksService extends BaseRemoteService {
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
request.getArgs()[3] = request.getId();
long retryStartTime = 0;
if (tasksRetryInterval > 0) {
retryStartTime = System.currentTimeMillis() + tasksRetryInterval;
}
return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return getAddCommandExecutor().evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[3], ARGV[1]); "
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('rpush', KEYS[6], ARGV[2]); "
+ "redis.call('incr', KEYS[1]);"
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('set', KEYS[7], ARGV[4]);"
+ "redis.call('zadd', KEYS[3], ARGV[1], 'ff' .. ARGV[2]);"
+ "local v = redis.call('zrange', KEYS[3], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end "
+ "end;"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, requestQueueName, tasksName),
request.getId(), encode(request));
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName),
retryStartTime, request.getId(), encode(request), tasksRetryInterval);
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local task = redis.call('hget', KEYS[5], ARGV[1]); " +
"if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[5], ARGV[1]); "
+ "if redis.call('decr', KEYS[2]) == 0 then "
+ "redis.call('del', KEYS[2]);"
+ "if redis.call('get', KEYS[3]) == ARGV[2] then "
+ "redis.call('set', KEYS[3], ARGV[3]);"
+ "redis.call('publish', KEYS[4], ARGV[3]);"
+ "end;"
+ "end;"
+ "return 1;"
"redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); "
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
// remove from executor queue
+ "if task ~= false and redis.call('exists', KEYS[3]) == 1 and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3], KEYS[7]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
+ "end;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
+ "return 1;"
+ "end;"
+ "if task == false then "
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
public RFuture<Boolean> cancelExecutionAsync(final RequestId requestId) {
@ -153,7 +196,7 @@ public class TasksService extends BaseRemoteService {
if (future.getNow()) {
result.trySuccess(true);
} else {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -15,9 +15,9 @@
*/
package org.redisson.remote;
import io.netty.buffer.ByteBuf;
import java.util.Arrays;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
*
@ -26,41 +26,26 @@ import io.netty.buffer.Unpooled;
*/
public class RequestId {
private final long id0;
private final long id1;
private final byte[] id;
public RequestId(String id) {
this(ByteBufUtil.decodeHexDump(id));
}
public RequestId(byte[] buf) {
ByteBuf b = Unpooled.wrappedBuffer(buf);
try {
id0 = b.readLong();
id1 = b.readLong();
} finally {
b.release();
}
id = buf;
}
@Override
public String toString() {
ByteBuf id = Unpooled.buffer(16);
try {
id.writeLong(id0);
id.writeLong(id1);
return ByteBufUtil.hexDump(id);
} finally {
id.release();
}
return ByteBufUtil.hexDump(id);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id0 ^ (id0 >>> 32));
result = prime * result + (int) (id1 ^ (id1 >>> 32));
result = prime * result + Arrays.hashCode(id);
return result;
}
@ -73,12 +58,9 @@ public class RequestId {
if (getClass() != obj.getClass())
return false;
RequestId other = (RequestId) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)
if (!Arrays.equals(id, other.id))
return false;
return true;
}
}

@ -0,0 +1,27 @@
package org.redisson.executor;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
public class FailoverTask implements Runnable {
@RInject
private RedissonClient redisson;
private String objectName;
public FailoverTask() {
}
public FailoverTask(String objectName) {
super();
this.objectName = objectName;
}
@Override
public void run() {
for (long i = 0; i < 20_000_000_000L; i++) {
}
redisson.getBucket(objectName).set(true);
}
}

@ -14,6 +14,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Duration;
import org.junit.After;
@ -21,12 +22,17 @@ import org.junit.Before;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedissonNode;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RExecutorService;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
public class RedissonExecutorServiceTest extends BaseTest {
private static RedissonNode node;
@ -68,6 +74,9 @@ public class RedissonExecutorServiceTest extends BaseTest {
future.get(5, TimeUnit.SECONDS);
future.getTaskFutures().stream().forEach(x -> x.syncUninterruptibly());
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -78,6 +87,9 @@ public class RedissonExecutorServiceTest extends BaseTest {
future.get(5, TimeUnit.SECONDS);
future.getTaskFutures().stream().forEach(x -> assertThat(x.getNow()).isEqualTo("1234"));
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();
}
@ -86,6 +98,72 @@ public class RedissonExecutorServiceTest extends BaseTest {
RExecutorService e = redisson.getExecutorService("test");
e.execute();
}
@Test
public void testTaskFinishing() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node = RedissonNode.create(nodeConfig);
node.start();
RExecutorService executor = redisson.getExecutorService("test2");
RExecutorFuture<?> f = executor.submit(new FailoverTask("finished"));
Thread.sleep(2000);
node.shutdown();
f.get();
assertThat(redisson.<Boolean>getBucket("finished").get()).isTrue();
}
@Test
public void testTaskFailover() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node = RedissonNode.create(nodeConfig);
node.start();
RExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
RExecutorFuture<?> f = executor.submit(new IncrementRunnableTask("counter"));
f.get();
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
Thread.sleep(8500);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
Thread.sleep(16000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
redisson.getKeys().delete("counter");
assertThat(redisson.getKeys().count()).isEqualTo(1);
}
@Test
public void testBatchExecute() {
@ -94,6 +172,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"));
await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4);
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -103,11 +183,13 @@ public class RedissonExecutorServiceTest extends BaseTest {
Thread.sleep(2000);
cancel(future);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
RExecutorFuture<?> futureAsync = executor.submitAsync(new ScheduledLongRunnableTask("executed2"));
Thread.sleep(2000);
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -134,14 +216,13 @@ public class RedissonExecutorServiceTest extends BaseTest {
for (Future<String> future : allResult) {
assertThat(future.get()).isEqualTo(CallableTask.RESULT);
}
List<CallableTask> invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask());
List<Future<String>> allResult1 = e.invokeAll(invokeAllParams1, 5, TimeUnit.SECONDS);
assertThat(allResult1).hasSize(invokeAllParams.size());
for (Future<String> future : allResult1) {
assertThat(future.get()).isEqualTo(CallableTask.RESULT);
}
}
@Test(expected = RejectedExecutionException.class)
@ -158,6 +239,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.execute(new RunnableTask());
assertThat(redisson.getKeys().count()).isZero();
}
@Test(expected = RejectedExecutionException.class)
@ -174,6 +257,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.submit(new RunnableTask2());
assertThat(redisson.getKeys().count()).isZero();
}
@Test(expected = RejectedExecutionException.class)
@ -190,6 +275,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.submit(new CallableTask());
assertThat(redisson.getKeys().count()).isZero();
}
@Test(expected = RejectedExecutionException.class)
@ -199,6 +286,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.submit(new RunnableTask2());
assertThat(redisson.getKeys().count()).isZero();
}
@ -255,6 +344,9 @@ public class RedissonExecutorServiceTest extends BaseTest {
s4.get();
assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L);
redisson.getKeys().delete("runnableCounter", "counter");
assertThat(redisson.getKeys().count()).isZero();
}
@Test

@ -10,6 +10,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
@ -18,11 +19,17 @@ import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedissonNode;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
public class RedissonScheduledExecutorServiceTest extends BaseTest {
private static RedissonNode node;
@ -44,6 +51,44 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
super.after();
node.shutdown();
}
@Test
public void testTaskFailover() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node = RedissonNode.create(nodeConfig);
node.start();
RScheduledExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
RExecutorFuture<?> f = executor.schedule(new IncrementRunnableTask("counter"), 1, TimeUnit.SECONDS);
f.get();
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
Thread.sleep(8500);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
Thread.sleep(16000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
redisson.getKeys().delete("counter");
assertThat(redisson.getKeys().count()).isEqualTo(1);
}
@Test(timeout = 7000)
public void testTaskResume() throws InterruptedException, ExecutionException {
@ -136,7 +181,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(2000);
cancel(future);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
RScheduledFuture<?> futureAsync = executor.scheduleAsync(new ScheduledLongRunnableTask("executed2"), 1, TimeUnit.SECONDS);
Thread.sleep(2000);
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
@ -201,7 +246,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(future1.cancel(true)).isTrue();
try {
future1.get();
Assert.fail("CancellationException should be arise");
Assert.fail("CancellationException should arise");
} catch (CancellationException e) {
// skip
}

@ -27,5 +27,5 @@ public class ScheduledLongRunnableTask implements Runnable {
}
}
}
}

Loading…
Cancel
Save