diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 113aaa6e6..b60857b64 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -39,7 +39,9 @@ import org.redisson.api.annotation.RRemoteAsync; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.RPromise; @@ -718,7 +720,7 @@ public abstract class BaseRemoteService { return; } - RMap canceledRequests = redisson.getMap(mapName, codec); + RMap canceledRequests = redisson.getMap(mapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); RFuture future = canceledRequests.removeAsync(requestId.toString()); future.addListener(new FutureListener() { @Override @@ -744,9 +746,10 @@ public abstract class BaseRemoteService { } protected RequestId generateRequestId() { - byte[] id = new byte[16]; + byte[] id = new byte[17]; // TODO JDK UPGRADE replace to native ThreadLocalRandom PlatformDependent.threadLocalRandom().nextBytes(id); + id[0] = 0; return new RequestId(id); } @@ -757,7 +760,7 @@ public abstract class BaseRemoteService { private void cancelExecution(RemoteInvocationOptions optionsCopy, boolean mayInterruptIfRunning, RemotePromise remotePromise) { - RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); + RMap canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 1d7bbd1a7..1cf745faa 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.BatchOptions; import org.redisson.api.ClusterNodesGroup; +import org.redisson.api.ExecutorOptions; import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.MapOptions; import org.redisson.api.Node; @@ -375,7 +376,12 @@ public class Redisson implements RedissonClient { @Override public RScheduledExecutorService getExecutorService(String name) { - return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses); + return getExecutorService(name, connectionManager.getCodec()); + } + + @Override + public RScheduledExecutorService getExecutorService(String name, ExecutorOptions options) { + return getExecutorService(name, connectionManager.getCodec(), options); } @Override @@ -386,7 +392,12 @@ public class Redisson implements RedissonClient { @Override public RScheduledExecutorService getExecutorService(String name, Codec codec) { - return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses); + return getExecutorService(name, codec, ExecutorOptions.defaults()); + } + + @Override + public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) { + return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 26320f4d9..6d43a5471 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.CronSchedule; +import org.redisson.api.ExecutorOptions; import org.redisson.api.RAtomicLong; import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; @@ -60,6 +61,7 @@ import org.redisson.connection.ConnectionManager; import org.redisson.executor.RedissonExecutorBatchFuture; import org.redisson.executor.RedissonExecutorFuture; import org.redisson.executor.RedissonExecutorFutureReference; +import org.redisson.executor.RedissonExecutorRemoteService; import org.redisson.executor.RedissonScheduledFuture; import org.redisson.executor.RemoteExecutorService; import org.redisson.executor.RemoteExecutorServiceAsync; @@ -105,6 +107,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final String tasksName; private final String schedulerQueueName; private final String schedulerChannelName; + private final String tasksRetryIntervalName; private final String workersChannelName; private final String workersSemaphoreName; @@ -123,6 +126,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final RemoteExecutorServiceAsync asyncServiceWithoutResult; private final ScheduledTasksService scheduledRemoteService; + private final TasksService executorRemoteService; private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); @@ -136,7 +140,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final ReferenceQueue> referenceDueue = new ReferenceQueue>(); private final Collection references = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); - public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap responses) { + public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, + String name, QueueTransferService queueTransferService, ConcurrentMap responses, ExecutorOptions options) { super(); this.codec = codec; this.commandExecutor = commandExecutor; @@ -152,7 +157,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name; } - remoteService = redisson.getRemoteService(name, codec); + remoteService = new RedissonExecutorRemoteService(codec, redisson, name, connectionManager.getCommandExecutor(), executorId, responses); requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class); responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId); String objectName = requestQueueName; @@ -161,7 +166,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { statusName = objectName + ":status"; terminationTopic = redisson.getTopic(objectName + ":termination-topic", codec); - + tasksRetryIntervalName = objectName + ":retry-interval"; schedulerChannelName = objectName + ":scheduler-channel"; schedulerQueueName = objectName + ":scheduler"; @@ -171,11 +176,15 @@ public class RedissonExecutorService implements RScheduledExecutorService { workersTopic = redisson.getTopic(workersChannelName); - TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses); + executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); executorRemoteService.setTasksName(tasksName); + executorRemoteService.setSchedulerChannelName(schedulerChannelName); + executorRemoteService.setSchedulerQueueName(schedulerQueueName); + executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); @@ -186,6 +195,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { scheduledRemoteService.setSchedulerQueueName(schedulerQueueName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); + scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); } @@ -234,9 +245,32 @@ public class RedissonExecutorService implements RScheduledExecutorService { protected RFuture pushTaskAsync() { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + + "local retryInterval = redis.call('get', KEYS[4]);" + "if #expiredTaskIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" - + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + + "if retryInterval ~= false then " + + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "for i = 1, #expiredTaskIds, 1 do " + + "local name = expiredTaskIds[i];" + + "local scheduledName = expiredTaskIds[i];" + + "if string.sub(scheduledName, 1, 2) ~= 'ff' then " + + "scheduledName = 'ff' .. scheduledName; " + + "else " + + "name = string.sub(name, 3, string.len(name)); " + + "end;" + + + "redis.call('zadd', KEYS[2], startTime, scheduledName);" + + "local v = redis.call('zrange', KEYS[2], 0, 0); " + // if new task added to queue head then publish its startTime + // to all scheduler workers + + "if v[1] == expiredTaskIds[i] then " + + "redis.call('publish', KEYS[3], startTime); " + + "end;" + + "redis.call('rpush', KEYS[1], name);" + + "end; " + + "else " + + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + + "end; " + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " @@ -244,7 +278,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "return v[2]; " + "end " + "return nil;", - Arrays.asList(requestQueueName, schedulerQueueName), + Arrays.asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), System.currentTimeMillis(), 100); } }; @@ -258,6 +292,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { service.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); service.setSchedulerChannelName(schedulerChannelName); service.setSchedulerQueueName(schedulerQueueName); + service.setTasksRetryIntervalName(tasksRetryIntervalName); remoteService.register(RemoteExecutorService.class, service, workers, executor); workersGroupListenerId = workersTopic.addListener(new MessageListener() { @@ -269,6 +304,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { }); } + private long repeatInterval = 5000; + @Override public void execute(Runnable task) { check(task); @@ -305,6 +342,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); executorRemoteService.setTasksName(tasksName); + executorRemoteService.setSchedulerChannelName(schedulerChannelName); + executorRemoteService.setSchedulerQueueName(schedulerQueueName); + executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); return executorRemoteService; } @@ -363,7 +403,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "redis.call('set', KEYS[2], ARGV[1]);" + "end;" + "end;", - Arrays.asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0)), + Arrays.asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0), tasksRetryIntervalName), SHUTDOWN_STATE, TERMINATED_STATE); } @@ -845,8 +885,13 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public boolean cancelTask(String taskId) { - RFuture scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId)); + if (taskId.startsWith("01")) { + RFuture scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId)); + return commandExecutor.get(scheduledFuture); + } + RFuture scheduledFuture = executorRemoteService.cancelExecutionAsync(new RequestId(taskId)); return commandExecutor.get(scheduledFuture); + } private T doInvokeAny(Collection> tasks, @@ -910,8 +955,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw ee; } finally { - for (Future f : futures) + for (Future f : futures) { f.cancel(true); + } } } diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 18c19e97b..391a470ec 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -187,7 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS final String requestId = future.getNow(); RMap tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec)); - RFuture taskFuture = tasks.removeAsync(requestId); + RFuture taskFuture = getTask(requestId, tasks); taskFuture.addListener(new FutureListener() { @Override @@ -330,7 +330,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // could be removed not from future object if (future.getNow().isSendResponse()) { - RMap map = redisson.getMap(cancelResponseMapName, codec); + RMap map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); map.putAsync(request.getId(), response); map.expireAsync(60, TimeUnit.SECONDS); } @@ -397,4 +397,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } } + protected RFuture getTask(final String requestId, RMap tasks) { + return tasks.removeAsync(requestId); + } + } diff --git a/redisson/src/main/java/org/redisson/api/ExecutorOptions.java b/redisson/src/main/java/org/redisson/api/ExecutorOptions.java new file mode 100644 index 000000000..cc349afc1 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/ExecutorOptions.java @@ -0,0 +1,56 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +/** + * Configuration for ExecutorService. + * + * @author Nikita Koksharov + * + */ +public class ExecutorOptions { + + private long taskRetryInterval = 60000; + + private ExecutorOptions() { + } + + public static ExecutorOptions defaults() { + return new ExecutorOptions(); + } + + public long getTaskRetryInterval() { + return taskRetryInterval; + } + + /** + * Defines task retry interval at the end of which task is executed again. + * ExecutorService worker re-schedule task execution retry every 5 seconds. + *

+ * Default is 1 minute + * + * @param timeout value + * @param unit value + * @return self instance + */ + public ExecutorOptions taskRetryInterval(long timeout, TimeUnit unit) { + this.taskRetryInterval = unit.toMillis(timeout); + return this; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 53eba5182..a084c47de 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -839,6 +839,15 @@ public interface RedissonClient { */ RScheduledExecutorService getExecutorService(String name); + /** + * Returns ScheduledExecutorService by name + * + * @param name - name of object + * @param options - options for executor + * @return ScheduledExecutorService object + */ + RScheduledExecutorService getExecutorService(String name, ExecutorOptions options); + /** * Returns ScheduledExecutorService by name * using provided codec for task, response and request serialization @@ -864,6 +873,17 @@ public interface RedissonClient { * @since 2.8.2 */ RScheduledExecutorService getExecutorService(String name, Codec codec); + + /** + * Returns ScheduledExecutorService by name + * using provided codec for task, response and request serialization + * + * @param name - name of object + * @param codec - codec for task, response and request + * @param options - options for executor + * @return ScheduledExecutorService object + */ + RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options); /** * Returns object for remote operations prefixed with the default name (redisson_remote_service) diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java new file mode 100644 index 000000000..f3d709f65 --- /dev/null +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -0,0 +1,46 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.concurrent.ConcurrentMap; + +import org.redisson.RedissonRemoteService; +import org.redisson.api.RFuture; +import org.redisson.api.RMap; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandExecutor; +import org.redisson.remote.RemoteServiceRequest; +import org.redisson.remote.ResponseEntry; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonExecutorRemoteService extends RedissonRemoteService { + + public RedissonExecutorRemoteService(Codec codec, RedissonClient redisson, String name, + CommandExecutor commandExecutor, String executorId, ConcurrentMap responses) { + super(codec, redisson, name, commandExecutor, executorId, responses); + } + + @Override + protected RFuture getTask(String requestId, RMap tasks) { + return tasks.getAsync(requestId); + } + +} diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index b35845d66..e6184a87c 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -23,12 +23,15 @@ import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; +import io.netty.util.internal.PlatformDependent; + /** * * @author Nikita Koksharov @@ -37,8 +40,6 @@ import org.redisson.remote.ResponseEntry; public class ScheduledTasksService extends TasksService { private RequestId requestId; - private String schedulerQueueName; - private String schedulerChannelName; public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, redissonId, responses); @@ -48,14 +49,6 @@ public class ScheduledTasksService extends TasksService { this.requestId = requestId; } - public void setSchedulerChannelName(String schedulerChannelName) { - this.schedulerChannelName = schedulerChannelName; - } - - public void setSchedulerQueueName(String scheduledQueueName) { - this.schedulerQueueName = scheduledQueueName; - } - @Override protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { int requestIndex = 0; @@ -75,6 +68,16 @@ public class ScheduledTasksService extends TasksService { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " + + "local retryInterval = redis.call('get', KEYS[6]); " + + "if retryInterval ~= false then " + + "local time = tonumber(ARGV[4]) + tonumber(retryInterval);" + + "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);" + + "elseif tonumber(ARGV[5]) > 0 then " + + "redis.call('set', KEYS[6], ARGV[5]);" + + "local time = tonumber(ARGV[4]) + tonumber(ARGV[5]);" + + "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);" + + "end; " + + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);" + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + "redis.call('incr', KEYS[1]);" @@ -87,31 +90,29 @@ public class ScheduledTasksService extends TasksService { + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName), - startTime, request.getId(), encode(request)); + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName), + startTime, request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval); } @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove from scheduler queue - "if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then " - + "redis.call('hdel', KEYS[6], ARGV[1]); " - + "if redis.call('decr', KEYS[3]) == 0 then " - + "redis.call('del', KEYS[3]);" - + "if redis.call('get', KEYS[4]) == ARGV[2] then " - + "redis.call('set', KEYS[4], ARGV[3]);" - + "redis.call('publish', KEYS[5], ARGV[3]);" - + "end;" - + "end;" + "if redis.call('exists', KEYS[3]) == 0 then " + "return 1;" + "end;" + + "local task = redis.call('hget', KEYS[6], ARGV[1]); " - // remove from executor queue - + "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " - + "redis.call('hdel', KEYS[6], ARGV[1]); " + + "redis.call('hdel', KEYS[6], ARGV[1]); " + + + "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); " + + "local removedScheduled = redis.call('zrem', KEYS[2], ARGV[1]); " + + "local removed = redis.call('lrem', KEYS[1], 1, ARGV[1]); " + + // remove from executor queue + + "if task ~= false and (removed > 0 or removedScheduled > 0) then " + "if redis.call('decr', KEYS[3]) == 0 then " - + "redis.call('del', KEYS[3]);" + + "redis.call('del', KEYS[3], KEYS[7]);" + "if redis.call('get', KEYS[4]) == ARGV[2] then " + "redis.call('set', KEYS[4], ARGV[3]);" + "redis.call('publish', KEYS[5], ARGV[3]);" @@ -119,17 +120,22 @@ public class ScheduledTasksService extends TasksService { + "end;" + "return 1;" + "end;" - // delete scheduled task - + "redis.call('hdel', KEYS[6], ARGV[1]); " + + "if task == false then " + + "return 1; " + + "end;" + "return 0;", - Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName), taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } @Override protected RequestId generateRequestId() { if (requestId == null) { - return super.generateRequestId(); + byte[] id = new byte[17]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + PlatformDependent.threadLocalRandom().nextBytes(id); + id[0] = 1; + return new RequestId(id); } return requestId; } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 0d3427918..e53396e8b 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -20,7 +20,9 @@ import java.util.Arrays; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import org.redisson.Redisson; import org.redisson.RedissonExecutorService; import org.redisson.RedissonShutdownException; import org.redisson.api.RFuture; @@ -28,6 +30,8 @@ import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.misc.Injector; @@ -36,6 +40,10 @@ import org.redisson.remote.ResponseEntry; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; /** * Executor service runs Callable and Runnable tasks. @@ -60,6 +68,7 @@ public class TasksRunnerService implements RemoteExecutorService { private String tasksName; private String schedulerQueueName; private String schedulerChannelName; + private String tasksRetryIntervalName; private ConcurrentMap responses; public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap responses) { @@ -76,6 +85,10 @@ public class TasksRunnerService implements RemoteExecutorService { } } + public void setTasksRetryIntervalName(String tasksRetryInterval) { + this.tasksRetryIntervalName = tasksRetryInterval; + } + public void setSchedulerQueueName(String schedulerQueueName) { this.schedulerQueueName = schedulerQueueName; } @@ -105,7 +118,7 @@ public class TasksRunnerService implements RemoteExecutorService { long newStartTime = System.currentTimeMillis() + period; RFuture future = asyncScheduledServiceAtFixed(executorId, requestId).scheduleAtFixedRate(className, classBody, state, newStartTime, period, executorId, requestId); try { - executeRunnable(className, classBody, state, null); + executeRunnable(className, classBody, state, requestId); } catch (RuntimeException e) { // cancel task if it throws an exception future.cancel(true); @@ -118,7 +131,7 @@ public class TasksRunnerService implements RemoteExecutorService { Date nextStartDate = new CronExpression(cronExpression).getNextValidTimeAfter(new Date()); RFuture future = asyncScheduledServiceAtFixed(executorId, requestId).schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, executorId, requestId); try { - executeRunnable(className, classBody, state, null); + executeRunnable(className, classBody, state, requestId); } catch (RuntimeException e) { // cancel task if it throws an exception future.cancel(true); @@ -141,13 +154,14 @@ public class TasksRunnerService implements RemoteExecutorService { scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setRequestId(new RequestId(requestId)); + scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); return asyncScheduledServiceAtFixed; } @Override public void scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId) { - executeRunnable(className, classBody, state, null); + executeRunnable(className, classBody, state, requestId); long newStartTime = System.currentTimeMillis() + delay; asyncScheduledServiceAtFixed(executorId, requestId).scheduleWithFixedDelay(className, classBody, state, newStartTime, delay, executorId, requestId); } @@ -164,6 +178,8 @@ public class TasksRunnerService implements RemoteExecutorService { @Override public Object executeCallable(String className, byte[] classBody, byte[] state, String requestId) { + renewRetryTime(requestId); + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length); try { buf.writeBytes(state); @@ -187,6 +203,51 @@ public class TasksRunnerService implements RemoteExecutorService { } } + protected void scheduleRetryTimeRenewal(final String requestId) { + ((Redisson)redisson).getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + renewRetryTime(requestId); + } + }, 5, TimeUnit.SECONDS); + } + + protected void renewRetryTime(final String requestId) { + RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + // check if executor service not in shutdown state + "local name = ARGV[2];" + + "local scheduledName = ARGV[2];" + + "if string.sub(scheduledName, 1, 2) ~= 'ff' then " + + "scheduledName = 'ff' .. scheduledName; " + + "else " + + "name = string.sub(name, 3, string.len(name)); " + + "end;" + + "local retryInterval = redis.call('get', KEYS[4]);" + + + "if redis.call('exists', KEYS[1]) == 0 and retryInterval ~= false and redis.call('hexists', KEYS[5], name) == 1 then " + + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "redis.call('zadd', KEYS[2], startTime, scheduledName);" + + "local v = redis.call('zrange', KEYS[2], 0, 0); " + // if new task added to queue head then publish its startTime + // to all scheduler workers + + "if v[1] == ARGV[2] then " + + "redis.call('publish', KEYS[3], startTime); " + + "end;" + + "return 1; " + + "end;" + + "return 0;", + Arrays.asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), + System.currentTimeMillis(), requestId); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess() || future.get()) { + scheduleRetryTimeRenewal(requestId); + } + } + }); + } + @SuppressWarnings("unchecked") private T decode(ByteBuf buf) throws IOException { @@ -197,6 +258,10 @@ public class TasksRunnerService implements RemoteExecutorService { @Override public void executeRunnable(String className, byte[] classBody, byte[] state, String requestId) { + if (requestId != null && requestId.startsWith("00")) { + renewRetryTime(requestId); + } + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length); try { buf.writeBytes(state); @@ -227,36 +292,26 @@ public class TasksRunnerService implements RemoteExecutorService { * If scheduledRequestId is not null then * delete scheduled task * - * @param scheduledRequestId + * @param requestId */ - private void finish(String scheduledRequestId) { + private void finish(String requestId) { classLoader.clearCurrentClassLoader(); - if (scheduledRequestId != null) { - commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID, - "redis.call('hdel', KEYS[4], ARGV[3]); " + - "if redis.call('decr', KEYS[1]) == 0 then " - + "redis.call('del', KEYS[1]);" - + "if redis.call('get', KEYS[2]) == ARGV[1] then " - + "redis.call('set', KEYS[2], ARGV[2]);" - + "redis.call('publish', KEYS[3], ARGV[2]);" - + "end;" - + "end;", - Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName), - RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, scheduledRequestId); - return; - } - - commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID, - "if redis.call('decr', KEYS[1]) == 0 then " - + "redis.call('del', KEYS[1]);" + commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, + "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" + + "if scheduled == false then " + + "redis.call('hdel', KEYS[4], ARGV[3]); " + + "end;" + + "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" + + "if redis.call('decr', KEYS[1]) == 0 then " + + "redis.call('del', KEYS[1], KEYS[6]);" + "if redis.call('get', KEYS[2]) == ARGV[1] then " + "redis.call('set', KEYS[2], ARGV[2]);" + "redis.call('publish', KEYS[3], ARGV[2]);" + "end;" - + "end;", - Arrays.asList(tasksCounterName, statusName, terminationTopicName), - RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + + "end;", + Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName), + RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId); } } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index c10157c6b..cd32ecf4f 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -26,7 +26,9 @@ import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -50,11 +52,23 @@ public class TasksService extends BaseRemoteService { protected String tasksCounterName; protected String statusName; protected String tasksName; + protected String schedulerQueueName; + protected String schedulerChannelName; + protected String tasksRetryIntervalName; + protected long tasksRetryInterval; public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, executorId, responses); } + public void setTasksRetryIntervalName(String tasksRetryIntervalName) { + this.tasksRetryIntervalName = tasksRetryIntervalName; + } + + public void setTasksRetryInterval(long tasksRetryInterval) { + this.tasksRetryInterval = tasksRetryInterval; + } + public void setTerminationTopicName(String terminationTopicName) { this.terminationTopicName = terminationTopicName; } @@ -70,6 +84,14 @@ public class TasksService extends BaseRemoteService { public void setTasksName(String tasksName) { this.tasksName = tasksName; } + + public void setSchedulerChannelName(String schedulerChannelName) { + this.schedulerChannelName = schedulerChannelName; + } + + public void setSchedulerQueueName(String scheduledQueueName) { + this.schedulerQueueName = scheduledQueueName; + } @Override protected final RFuture addAsync(String requestQueueName, @@ -104,37 +126,58 @@ public class TasksService extends BaseRemoteService { protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { request.getArgs()[3] = request.getId(); + long retryStartTime = 0; + if (tasksRetryInterval > 0) { + retryStartTime = System.currentTimeMillis() + tasksRetryInterval; + } - return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return getAddCommandExecutor().evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " - + "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);" - + "redis.call('rpush', KEYS[3], ARGV[1]); " + + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + + "redis.call('rpush', KEYS[6], ARGV[2]); " + "redis.call('incr', KEYS[1]);" + + + "if tonumber(ARGV[1]) > 0 then " + + "redis.call('set', KEYS[7], ARGV[4]);" + + "redis.call('zadd', KEYS[3], ARGV[1], 'ff' .. ARGV[2]);" + + "local v = redis.call('zrange', KEYS[3], 0, 0); " + // if new task added to queue head then publish its startTime + // to all scheduler workers + + "if v[1] == ARGV[2] then " + + "redis.call('publish', KEYS[4], ARGV[1]); " + + "end " + + "end;" + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, requestQueueName, tasksName), - request.getId(), encode(request)); + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName), + retryStartTime, request.getId(), encode(request), tasksRetryInterval); } @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local task = redis.call('hget', KEYS[5], ARGV[1]); " + - "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " - + "redis.call('hdel', KEYS[5], ARGV[1]); " - + "if redis.call('decr', KEYS[2]) == 0 then " - + "redis.call('del', KEYS[2]);" - + "if redis.call('get', KEYS[3]) == ARGV[2] then " - + "redis.call('set', KEYS[3], ARGV[3]);" - + "redis.call('publish', KEYS[4], ARGV[3]);" - + "end;" - + "end;" - + "return 1;" + "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); " + + "local task = redis.call('hget', KEYS[6], ARGV[1]); " + + "redis.call('hdel', KEYS[6], ARGV[1]); " + // remove from executor queue + + "if task ~= false and redis.call('exists', KEYS[3]) == 1 and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " + + "if redis.call('decr', KEYS[3]) == 0 then " + + "redis.call('del', KEYS[3], KEYS[7]);" + + "if redis.call('get', KEYS[4]) == ARGV[2] then " + + "redis.call('set', KEYS[4], ARGV[3]);" + + "redis.call('publish', KEYS[5], ARGV[3]);" + + "end;" + "end;" - + "return 0;", - Arrays.asList(requestQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), - taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + + "return 1;" + + "end;" + + "if task == false then " + + "return 1; " + + "end;" + + "return 0;", + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName), + taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } public RFuture cancelExecutionAsync(final RequestId requestId) { @@ -153,7 +196,7 @@ public class TasksService extends BaseRemoteService { if (future.getNow()) { result.trySuccess(true); } else { - RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); + RMap canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/remote/RequestId.java b/redisson/src/main/java/org/redisson/remote/RequestId.java index 8f8757dd9..5646e259e 100644 --- a/redisson/src/main/java/org/redisson/remote/RequestId.java +++ b/redisson/src/main/java/org/redisson/remote/RequestId.java @@ -15,9 +15,9 @@ */ package org.redisson.remote; -import io.netty.buffer.ByteBuf; +import java.util.Arrays; + import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; /** * @@ -26,41 +26,26 @@ import io.netty.buffer.Unpooled; */ public class RequestId { - private final long id0; - private final long id1; + private final byte[] id; public RequestId(String id) { this(ByteBufUtil.decodeHexDump(id)); } public RequestId(byte[] buf) { - ByteBuf b = Unpooled.wrappedBuffer(buf); - try { - id0 = b.readLong(); - id1 = b.readLong(); - } finally { - b.release(); - } + id = buf; } @Override public String toString() { - ByteBuf id = Unpooled.buffer(16); - try { - id.writeLong(id0); - id.writeLong(id1); - return ByteBufUtil.hexDump(id); - } finally { - id.release(); - } + return ByteBufUtil.hexDump(id); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + (int) (id0 ^ (id0 >>> 32)); - result = prime * result + (int) (id1 ^ (id1 >>> 32)); + result = prime * result + Arrays.hashCode(id); return result; } @@ -73,12 +58,9 @@ public class RequestId { if (getClass() != obj.getClass()) return false; RequestId other = (RequestId) obj; - if (id0 != other.id0) - return false; - if (id1 != other.id1) + if (!Arrays.equals(id, other.id)) return false; return true; } - } diff --git a/redisson/src/test/java/org/redisson/executor/FailoverTask.java b/redisson/src/test/java/org/redisson/executor/FailoverTask.java new file mode 100644 index 000000000..e196c632c --- /dev/null +++ b/redisson/src/test/java/org/redisson/executor/FailoverTask.java @@ -0,0 +1,27 @@ +package org.redisson.executor; + +import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class FailoverTask implements Runnable { + + @RInject + private RedissonClient redisson; + private String objectName; + + public FailoverTask() { + } + + public FailoverTask(String objectName) { + super(); + this.objectName = objectName; + } + + @Override + public void run() { + for (long i = 0; i < 20_000_000_000L; i++) { + } + redisson.getBucket(objectName).set(true); + } + +} diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 8cf9b862e..5c0254228 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -14,6 +14,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Duration; import org.junit.After; @@ -21,12 +22,17 @@ import org.junit.Before; import org.junit.Test; import org.redisson.BaseTest; import org.redisson.RedissonNode; +import org.redisson.api.ExecutorOptions; import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; import org.redisson.api.RExecutorService; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; + public class RedissonExecutorServiceTest extends BaseTest { private static RedissonNode node; @@ -68,6 +74,9 @@ public class RedissonExecutorServiceTest extends BaseTest { future.get(5, TimeUnit.SECONDS); future.getTaskFutures().stream().forEach(x -> x.syncUninterruptibly()); + + redisson.getKeys().delete("myCounter"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -78,6 +87,9 @@ public class RedissonExecutorServiceTest extends BaseTest { future.get(5, TimeUnit.SECONDS); future.getTaskFutures().stream().forEach(x -> assertThat(x.getNow()).isEqualTo("1234")); + + redisson.getKeys().delete("myCounter"); + assertThat(redisson.getKeys().count()).isZero(); } @@ -86,6 +98,72 @@ public class RedissonExecutorServiceTest extends BaseTest { RExecutorService e = redisson.getExecutorService("test"); e.execute(); } + + @Test + public void testTaskFinishing() throws Exception { + AtomicInteger counter = new AtomicInteger(); + new MockUp() { + @Mock + private void finish(Invocation invocation, String requestId) { + if (counter.incrementAndGet() > 1) { + invocation.proceed(); + } + } + }; + + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + RExecutorService executor = redisson.getExecutorService("test2"); + RExecutorFuture f = executor.submit(new FailoverTask("finished")); + Thread.sleep(2000); + node.shutdown(); + + f.get(); + assertThat(redisson.getBucket("finished").get()).isTrue(); + } + + @Test + public void testTaskFailover() throws Exception { + AtomicInteger counter = new AtomicInteger(); + new MockUp() { + @Mock + private void finish(Invocation invocation, String requestId) { + if (counter.incrementAndGet() > 1) { + invocation.proceed(); + } + } + }; + + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + + RExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS)); + RExecutorFuture f = executor.submit(new IncrementRunnableTask("counter")); + f.get(); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1); + Thread.sleep(2000); + node.shutdown(); + + node = RedissonNode.create(nodeConfig); + node.start(); + + Thread.sleep(8500); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + Thread.sleep(16000); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + redisson.getKeys().delete("counter"); + assertThat(redisson.getKeys().count()).isEqualTo(1); + } @Test public void testBatchExecute() { @@ -94,6 +172,8 @@ public class RedissonExecutorServiceTest extends BaseTest { new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter")); await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4); + redisson.getKeys().delete("myCounter"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -103,11 +183,13 @@ public class RedissonExecutorServiceTest extends BaseTest { Thread.sleep(2000); cancel(future); assertThat(redisson.getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE); - RExecutorFuture futureAsync = executor.submitAsync(new ScheduledLongRunnableTask("executed2")); Thread.sleep(2000); assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue(); assertThat(redisson.getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE); + + redisson.getKeys().delete("executed1", "executed2"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -134,14 +216,13 @@ public class RedissonExecutorServiceTest extends BaseTest { for (Future future : allResult) { assertThat(future.get()).isEqualTo(CallableTask.RESULT); } - + List invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()); List> allResult1 = e.invokeAll(invokeAllParams1, 5, TimeUnit.SECONDS); assertThat(allResult1).hasSize(invokeAllParams.size()); for (Future future : allResult1) { assertThat(future.get()).isEqualTo(CallableTask.RESULT); } - } @Test(expected = RejectedExecutionException.class) @@ -158,6 +239,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.execute(new RunnableTask()); + + assertThat(redisson.getKeys().count()).isZero(); } @Test(expected = RejectedExecutionException.class) @@ -174,6 +257,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.submit(new RunnableTask2()); + + assertThat(redisson.getKeys().count()).isZero(); } @Test(expected = RejectedExecutionException.class) @@ -190,6 +275,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.submit(new CallableTask()); + + assertThat(redisson.getKeys().count()).isZero(); } @Test(expected = RejectedExecutionException.class) @@ -199,6 +286,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.submit(new RunnableTask2()); + + assertThat(redisson.getKeys().count()).isZero(); } @@ -255,6 +344,9 @@ public class RedissonExecutorServiceTest extends BaseTest { s4.get(); assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L); + + redisson.getKeys().delete("runnableCounter", "counter"); + assertThat(redisson.getKeys().count()).isZero(); } @Test diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 98090be02..81ce04869 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -10,6 +10,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Assert; @@ -18,11 +19,17 @@ import org.junit.Test; import org.redisson.BaseTest; import org.redisson.RedissonNode; import org.redisson.api.CronSchedule; +import org.redisson.api.ExecutorOptions; +import org.redisson.api.RExecutorFuture; import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledFuture; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; + public class RedissonScheduledExecutorServiceTest extends BaseTest { private static RedissonNode node; @@ -44,6 +51,44 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { super.after(); node.shutdown(); } + + @Test + public void testTaskFailover() throws Exception { + AtomicInteger counter = new AtomicInteger(); + new MockUp() { + @Mock + private void finish(Invocation invocation, String requestId) { + if (counter.incrementAndGet() > 1) { + invocation.proceed(); + } + } + }; + + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + RScheduledExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS)); + RExecutorFuture f = executor.schedule(new IncrementRunnableTask("counter"), 1, TimeUnit.SECONDS); + f.get(); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1); + Thread.sleep(2000); + node.shutdown(); + + node = RedissonNode.create(nodeConfig); + node.start(); + + Thread.sleep(8500); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + Thread.sleep(16000); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + redisson.getKeys().delete("counter"); + assertThat(redisson.getKeys().count()).isEqualTo(1); + } @Test(timeout = 7000) public void testTaskResume() throws InterruptedException, ExecutionException { @@ -136,7 +181,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Thread.sleep(2000); cancel(future); assertThat(redisson.getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE); - + RScheduledFuture futureAsync = executor.scheduleAsync(new ScheduledLongRunnableTask("executed2"), 1, TimeUnit.SECONDS); Thread.sleep(2000); assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue(); @@ -201,7 +246,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { assertThat(future1.cancel(true)).isTrue(); try { future1.get(); - Assert.fail("CancellationException should be arise"); + Assert.fail("CancellationException should arise"); } catch (CancellationException e) { // skip } diff --git a/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java b/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java index 1a598310b..1f98f14c0 100644 --- a/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java +++ b/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java @@ -27,5 +27,5 @@ public class ScheduledLongRunnableTask implements Runnable { } } } - + }