diff --git a/redisson/pom.xml b/redisson/pom.xml index 06d2fdaef..28e820e98 100644 --- a/redisson/pom.xml +++ b/redisson/pom.xml @@ -81,7 +81,7 @@ org.assertj assertj-core - 3.9.1 + 3.10.0 test @@ -90,6 +90,12 @@ 3.1.0 test + + org.jmockit + jmockit + 1.39 + test + junit junit @@ -102,12 +108,6 @@ 1.7.25 test - - org.jmockit - jmockit - 1.33 - test - org.apache.tomcat.embed diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 692e670e8..b60857b64 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -39,7 +39,9 @@ import org.redisson.api.annotation.RRemoteAsync; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.RPromise; @@ -718,8 +720,8 @@ public abstract class BaseRemoteService { return; } - RMap canceledRequests = redisson.getMap(mapName, codec); - RFuture future = canceledRequests.getAsync(requestId.toString()); + RMap canceledRequests = redisson.getMap(mapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); + RFuture future = canceledRequests.removeAsync(requestId.toString()); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -744,9 +746,10 @@ public abstract class BaseRemoteService { } protected RequestId generateRequestId() { - byte[] id = new byte[16]; + byte[] id = new byte[17]; // TODO JDK UPGRADE replace to native ThreadLocalRandom PlatformDependent.threadLocalRandom().nextBytes(id); + id[0] = 0; return new RequestId(id); } @@ -757,7 +760,7 @@ public abstract class BaseRemoteService { private void cancelExecution(RemoteInvocationOptions optionsCopy, boolean mayInterruptIfRunning, RemotePromise remotePromise) { - RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); + RMap canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 1d7bbd1a7..1cf745faa 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.BatchOptions; import org.redisson.api.ClusterNodesGroup; +import org.redisson.api.ExecutorOptions; import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.MapOptions; import org.redisson.api.Node; @@ -375,7 +376,12 @@ public class Redisson implements RedissonClient { @Override public RScheduledExecutorService getExecutorService(String name) { - return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses); + return getExecutorService(name, connectionManager.getCodec()); + } + + @Override + public RScheduledExecutorService getExecutorService(String name, ExecutorOptions options) { + return getExecutorService(name, connectionManager.getCodec(), options); } @Override @@ -386,7 +392,12 @@ public class Redisson implements RedissonClient { @Override public RScheduledExecutorService getExecutorService(String name, Codec codec) { - return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses); + return getExecutorService(name, codec, ExecutorOptions.defaults()); + } + + @Override + public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) { + return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index aed77a9cf..6d43a5471 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.CronSchedule; +import org.redisson.api.ExecutorOptions; import org.redisson.api.RAtomicLong; import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; @@ -60,6 +61,7 @@ import org.redisson.connection.ConnectionManager; import org.redisson.executor.RedissonExecutorBatchFuture; import org.redisson.executor.RedissonExecutorFuture; import org.redisson.executor.RedissonExecutorFutureReference; +import org.redisson.executor.RedissonExecutorRemoteService; import org.redisson.executor.RedissonScheduledFuture; import org.redisson.executor.RemoteExecutorService; import org.redisson.executor.RemoteExecutorServiceAsync; @@ -92,6 +94,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); + private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS); + public static final int SHUTDOWN_STATE = 1; public static final int TERMINATED_STATE = 2; @@ -103,6 +107,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final String tasksName; private final String schedulerQueueName; private final String schedulerChannelName; + private final String tasksRetryIntervalName; private final String workersChannelName; private final String workersSemaphoreName; @@ -121,6 +126,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final RemoteExecutorServiceAsync asyncServiceWithoutResult; private final ScheduledTasksService scheduledRemoteService; + private final TasksService executorRemoteService; private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); @@ -134,7 +140,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final ReferenceQueue> referenceDueue = new ReferenceQueue>(); private final Collection references = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); - public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap responses) { + public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, + String name, QueueTransferService queueTransferService, ConcurrentMap responses, ExecutorOptions options) { super(); this.codec = codec; this.commandExecutor = commandExecutor; @@ -150,7 +157,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name; } - remoteService = redisson.getRemoteService(name, codec); + remoteService = new RedissonExecutorRemoteService(codec, redisson, name, connectionManager.getCommandExecutor(), executorId, responses); requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class); responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId); String objectName = requestQueueName; @@ -159,7 +166,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { statusName = objectName + ":status"; terminationTopic = redisson.getTopic(objectName + ":termination-topic", codec); - + tasksRetryIntervalName = objectName + ":retry-interval"; schedulerChannelName = objectName + ":scheduler-channel"; schedulerQueueName = objectName + ":scheduler"; @@ -169,12 +176,16 @@ public class RedissonExecutorService implements RScheduledExecutorService { workersTopic = redisson.getTopic(workersChannelName); - TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses); + executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); executorRemoteService.setTasksName(tasksName); - asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + executorRemoteService.setSchedulerChannelName(schedulerChannelName); + executorRemoteService.setSchedulerQueueName(schedulerQueueName); + executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); + asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses); @@ -184,7 +195,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { scheduledRemoteService.setSchedulerQueueName(schedulerQueueName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); - asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); + asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); } @@ -232,9 +245,32 @@ public class RedissonExecutorService implements RScheduledExecutorService { protected RFuture pushTaskAsync() { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + + "local retryInterval = redis.call('get', KEYS[4]);" + "if #expiredTaskIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" - + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + + "if retryInterval ~= false then " + + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "for i = 1, #expiredTaskIds, 1 do " + + "local name = expiredTaskIds[i];" + + "local scheduledName = expiredTaskIds[i];" + + "if string.sub(scheduledName, 1, 2) ~= 'ff' then " + + "scheduledName = 'ff' .. scheduledName; " + + "else " + + "name = string.sub(name, 3, string.len(name)); " + + "end;" + + + "redis.call('zadd', KEYS[2], startTime, scheduledName);" + + "local v = redis.call('zrange', KEYS[2], 0, 0); " + // if new task added to queue head then publish its startTime + // to all scheduler workers + + "if v[1] == expiredTaskIds[i] then " + + "redis.call('publish', KEYS[3], startTime); " + + "end;" + + "redis.call('rpush', KEYS[1], name);" + + "end; " + + "else " + + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + + "end; " + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " @@ -242,7 +278,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "return v[2]; " + "end " + "return nil;", - Arrays.asList(requestQueueName, schedulerQueueName), + Arrays.asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), System.currentTimeMillis(), 100); } }; @@ -256,6 +292,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { service.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); service.setSchedulerChannelName(schedulerChannelName); service.setSchedulerQueueName(schedulerQueueName); + service.setTasksRetryIntervalName(tasksRetryIntervalName); remoteService.register(RemoteExecutorService.class, service, workers, executor); workersGroupListenerId = workersTopic.addListener(new MessageListener() { @@ -267,6 +304,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { }); } + private long repeatInterval = 5000; + @Override public void execute(Runnable task) { check(task); @@ -303,6 +342,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); executorRemoteService.setTasksName(tasksName); + executorRemoteService.setSchedulerChannelName(schedulerChannelName); + executorRemoteService.setSchedulerQueueName(schedulerQueueName); + executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); return executorRemoteService; } @@ -361,7 +403,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "redis.call('set', KEYS[2], ARGV[1]);" + "end;" + "end;", - Arrays.asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0)), + Arrays.asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0), tasksRetryIntervalName), SHUTDOWN_STATE, TERMINATED_STATE); } @@ -471,7 +513,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { List> result = new ArrayList>(); TasksBatchService executorRemoteService = createBatchService(); - RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); for (Callable task : tasks) { check(task); byte[] classBody = getClassBody(task); @@ -496,7 +538,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } TasksBatchService executorRemoteService = createBatchService(); - RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); final List> result = new ArrayList>(); for (Callable task : tasks) { check(task); @@ -599,7 +641,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { List> result = new ArrayList>(); TasksBatchService executorRemoteService = createBatchService(); - RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); for (Runnable task : tasks) { check(task); byte[] classBody = getClassBody(task); @@ -624,7 +666,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } TasksBatchService executorRemoteService = createBatchService(); - RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); + RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); final List> result = new ArrayList>(); for (Runnable task : tasks) { check(task); @@ -843,8 +885,13 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public boolean cancelTask(String taskId) { - RFuture scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId)); + if (taskId.startsWith("01")) { + RFuture scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId)); + return commandExecutor.get(scheduledFuture); + } + RFuture scheduledFuture = executorRemoteService.cancelExecutionAsync(new RequestId(taskId)); return commandExecutor.get(scheduledFuture); + } private T doInvokeAny(Collection> tasks, @@ -908,8 +955,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw ee; } finally { - for (Future f : futures) + for (Future f : futures) { f.cancel(true); + } } } diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index 5b90838f5..5106eecfe 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.redisson.api.RExecutorService; import org.redisson.api.RFuture; @@ -116,7 +117,7 @@ public class RedissonNode { */ public void shutdown() { if (hasRedissonInstance) { - redisson.shutdown(); + redisson.shutdown(0, 15, TimeUnit.MINUTES); log.info("Redisson node has been shutdown successfully"); } } diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 807d0999b..391a470ec 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -187,7 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS final String requestId = future.getNow(); RMap tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec)); - RFuture taskFuture = tasks.getAsync(requestId); + RFuture taskFuture = getTask(requestId, tasks); taskFuture.addListener(new FutureListener() { @Override @@ -220,10 +220,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - final String responseName = getResponseQueueName(request.getExecutorId()); // send the ack only if expected if (request.getOptions().isAckExpected()) { + final String responseName = getResponseQueueName(request.getExecutorId()); String ackName = getAckName(request.getId()); RFuture ackClientsFuture = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -330,7 +330,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // could be removed not from future object if (future.getNow().isSendResponse()) { - RMap map = redisson.getMap(cancelResponseMapName, codec); + RMap map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); map.putAsync(request.getId(), response); map.expireAsync(60, TimeUnit.SECONDS); } @@ -397,4 +397,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } } + protected RFuture getTask(final String requestId, RMap tasks) { + return tasks.removeAsync(requestId); + } + } diff --git a/redisson/src/main/java/org/redisson/api/ExecutorOptions.java b/redisson/src/main/java/org/redisson/api/ExecutorOptions.java new file mode 100644 index 000000000..cc349afc1 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/ExecutorOptions.java @@ -0,0 +1,56 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +/** + * Configuration for ExecutorService. + * + * @author Nikita Koksharov + * + */ +public class ExecutorOptions { + + private long taskRetryInterval = 60000; + + private ExecutorOptions() { + } + + public static ExecutorOptions defaults() { + return new ExecutorOptions(); + } + + public long getTaskRetryInterval() { + return taskRetryInterval; + } + + /** + * Defines task retry interval at the end of which task is executed again. + * ExecutorService worker re-schedule task execution retry every 5 seconds. + *

+ * Default is 1 minute + * + * @param timeout value + * @param unit value + * @return self instance + */ + public ExecutorOptions taskRetryInterval(long timeout, TimeUnit unit) { + this.taskRetryInterval = unit.toMillis(timeout); + return this; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 53eba5182..a084c47de 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -839,6 +839,15 @@ public interface RedissonClient { */ RScheduledExecutorService getExecutorService(String name); + /** + * Returns ScheduledExecutorService by name + * + * @param name - name of object + * @param options - options for executor + * @return ScheduledExecutorService object + */ + RScheduledExecutorService getExecutorService(String name, ExecutorOptions options); + /** * Returns ScheduledExecutorService by name * using provided codec for task, response and request serialization @@ -864,6 +873,17 @@ public interface RedissonClient { * @since 2.8.2 */ RScheduledExecutorService getExecutorService(String name, Codec codec); + + /** + * Returns ScheduledExecutorService by name + * using provided codec for task, response and request serialization + * + * @param name - name of object + * @param codec - codec for task, response and request + * @param options - options for executor + * @return ScheduledExecutorService object + */ + RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options); /** * Returns object for remote operations prefixed with the default name (redisson_remote_service) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 4ed2cba4d..20d1f28be 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -582,7 +582,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (Integer slot : removedSlots) { MasterSlaveEntry entry = removeEntry(slot); if (entry.getSlotRanges().isEmpty()) { - entry.shutdownMasterAsync(); + entry.shutdownAsync(); log.info("{} master and slaves for it removed", entry.getClient().getAddr()); } } diff --git a/redisson/src/main/java/org/redisson/codec/DefenceModule.java b/redisson/src/main/java/org/redisson/codec/DefenceModule.java deleted file mode 100644 index 6c0e54a15..000000000 --- a/redisson/src/main/java/org/redisson/codec/DefenceModule.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.codec; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import com.fasterxml.jackson.databind.BeanDescription; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.deser.ValueInstantiator; -import com.fasterxml.jackson.databind.deser.ValueInstantiators.Base; -import com.fasterxml.jackson.databind.module.SimpleModule; - -/** - * Fix for https://github.com/FasterXML/jackson-databind/issues/1599 - * - * @author Nikita Koksharov - * - * TODO remove after update to latest version of Jackson - * - */ -public class DefenceModule extends SimpleModule { - - private static final long serialVersionUID = -429891510707420220L; - - public static class DefenceValueInstantiator extends Base { - - protected final static Set DEFAULT_NO_DESER_CLASS_NAMES; - static { - Set s = new HashSet(); - // Courtesy of [https://github.com/kantega/notsoserial]: - // (and wrt [databind#1599] - s.add("org.apache.commons.collections.functors.InvokerTransformer"); - s.add("org.apache.commons.collections.functors.InstantiateTransformer"); - s.add("org.apache.commons.collections4.functors.InvokerTransformer"); - s.add("org.apache.commons.collections4.functors.InstantiateTransformer"); - s.add("org.codehaus.groovy.runtime.ConvertedClosure"); - s.add("org.codehaus.groovy.runtime.MethodClosure"); - s.add("org.springframework.beans.factory.ObjectFactory"); - s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl"); - DEFAULT_NO_DESER_CLASS_NAMES = Collections.unmodifiableSet(s); - } - - @Override - public ValueInstantiator findValueInstantiator(DeserializationConfig config, BeanDescription beanDesc, - ValueInstantiator defaultInstantiator) { - if (DEFAULT_NO_DESER_CLASS_NAMES.contains(beanDesc.getClassInfo().getRawType().getName())) { - throw new IllegalArgumentException("Illegal type " + beanDesc.getClassInfo().getRawType().getName() + " to deserialize: prevented for security reasons"); - } - - return super.findValueInstantiator(config, beanDesc, defaultInstantiator); - } - - } - - @Override - public void setupModule(SetupContext context) { - context.addValueInstantiators(new DefenceValueInstantiator()); - } - -} diff --git a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java index cb00e6472..f8e5145f3 100755 --- a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -153,8 +153,6 @@ public class JsonJacksonCodec extends BaseCodec { } protected void init(ObjectMapper objectMapper) { - objectMapper.registerModule(new DefenceModule()); - objectMapper.setSerializationInclusion(Include.NON_NULL); objectMapper.setVisibility(objectMapper.getSerializationConfig() .getDefaultVisibilityChecker() diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 4de8b004d..21e5e47fc 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -67,8 +67,6 @@ public interface ConnectionManager { IdleConnectionWatcher getConnectionWatcher(); - void shutdownAsync(RedisClient client); - int calcSlot(String key); MasterSlaveServersConfig getConfig(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 622744c9a..3c6c1965f 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -46,11 +46,11 @@ import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.TransportMode; +import org.redisson.misc.CountableListener; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.URIBuilder; -import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -435,11 +435,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return client; } - @Override - public void shutdownAsync(RedisClient client) { - client.shutdownAsync(); - } - @Override public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname); @@ -633,16 +628,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { dnsMonitor.stop(); } - timer.stop(); - - shutdownLatch.close(); - shutdownPromise.trySuccess(true); - shutdownLatch.awaitUninterruptibly(); - - for (MasterSlaveEntry entry : getEntrySet()) { - entry.shutdown(); - } - if (cfg.getExecutor() == null) { executor.shutdown(); try { @@ -651,7 +636,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager { Thread.currentThread().interrupt(); } } + + timer.stop(); + + shutdownLatch.close(); + shutdownPromise.trySuccess(true); + shutdownLatch.awaitUninterruptibly(); + + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, null, getEntrySet().size()); + for (MasterSlaveEntry entry : getEntrySet()) { + entry.shutdownAsync().addListener(listener); + } + result.awaitUninterruptibly(timeout, unit); resolverGroup.close(); if (cfg.getEventLoopGroup() == null) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 304294bec..c3ddcff21 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -139,25 +139,28 @@ public class MasterSlaveEntry { return; } - masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), - connectionManager, - NodeType.MASTER); - - CountableListener listener = new CountableListener(result, client); - RFuture writeFuture = writeConnectionPool.add(masterEntry); - listener.incCounter(); - writeFuture.addListener(listener); - - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - listener.incCounter(); - pubSubFuture.addListener(listener); - } + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, + NodeType.MASTER); + + int counter = 1; + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + counter++; + } + + CountableListener listener = new CountableListener(result, client, counter); + RFuture writeFuture = writeConnectionPool.add(masterEntry); + writeFuture.addListener(listener); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + pubSubFuture.addListener(listener); + } } }); @@ -465,19 +468,22 @@ public class MasterSlaveEntry { && slaveBalancer.getAvailableClients() > 1) { slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM); } - connectionManager.shutdownAsync(oldMaster.getClient()); + oldMaster.getClient().shutdownAsync(); log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr()); } }); } - public void shutdownMasterAsync() { + public RFuture shutdownAsync() { if (!active.compareAndSet(true, false)) { - return; + return RedissonPromise.newSucceededFuture(null); } - connectionManager.shutdownAsync(masterEntry.getClient()); - slaveBalancer.shutdownAsync(); + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, null, 2); + masterEntry.getClient().shutdownAsync().addListener(listener); + slaveBalancer.shutdownAsync().addListener(listener); + return result; } public RFuture connectionWriteOp(RedisCommand command) { @@ -526,15 +532,6 @@ public class MasterSlaveEntry { slaveBalancer.returnConnection(connection); } - public void shutdown() { - if (!active.compareAndSet(true, false)) { - return; - } - - masterEntry.getClient().shutdown(); - slaveBalancer.shutdown(); - } - public void addSlotRange(Integer range) { slots.add(range); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index af5f51597..a83b7cd99 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -83,19 +83,17 @@ public class LoadBalancerManager { public RFuture add(final ClientConnectionsEntry entry) { RPromise result = new RedissonPromise(); - CountableListener listener = new CountableListener(result, null) { + CountableListener listener = new CountableListener(result, null, 2) { @Override protected void onSuccess(Void value) { - client2Entry.put(entry.getClient(), entry); - } + client2Entry.put(entry.getClient(), entry); + } }; RFuture slaveFuture = slaveConnectionPool.add(entry); - listener.incCounter(); slaveFuture.addListener(listener); RFuture pubSubFuture = pubSubConnectionPool.add(entry); - listener.incCounter(); pubSubFuture.addListener(listener); return result; } @@ -249,16 +247,16 @@ public class LoadBalancerManager { slaveConnectionPool.returnConnection(entry, connection); } - public void shutdown() { - for (ClientConnectionsEntry entry : client2Entry.values()) { - entry.getClient().shutdown(); + public RFuture shutdownAsync() { + if (client2Entry.values().isEmpty()) { + return RedissonPromise.newSucceededFuture(null); } - } - - public void shutdownAsync() { + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, null, client2Entry.values().size()); for (ClientConnectionsEntry entry : client2Entry.values()) { - connectionManager.shutdownAsync(entry.getClient()); + entry.getClient().shutdownAsync().addListener(listener); } + return result; } } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java new file mode 100644 index 000000000..f3d709f65 --- /dev/null +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -0,0 +1,46 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.concurrent.ConcurrentMap; + +import org.redisson.RedissonRemoteService; +import org.redisson.api.RFuture; +import org.redisson.api.RMap; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandExecutor; +import org.redisson.remote.RemoteServiceRequest; +import org.redisson.remote.ResponseEntry; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonExecutorRemoteService extends RedissonRemoteService { + + public RedissonExecutorRemoteService(Codec codec, RedissonClient redisson, String name, + CommandExecutor commandExecutor, String executorId, ConcurrentMap responses) { + super(codec, redisson, name, commandExecutor, executorId, responses); + } + + @Override + protected RFuture getTask(String requestId, RMap tasks) { + return tasks.getAsync(requestId); + } + +} diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index f1a7c55ad..e6184a87c 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -23,12 +23,15 @@ import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; +import io.netty.util.internal.PlatformDependent; + /** * * @author Nikita Koksharov @@ -37,8 +40,6 @@ import org.redisson.remote.ResponseEntry; public class ScheduledTasksService extends TasksService { private RequestId requestId; - private String schedulerQueueName; - private String schedulerChannelName; public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, redissonId, responses); @@ -48,14 +49,6 @@ public class ScheduledTasksService extends TasksService { this.requestId = requestId; } - public void setSchedulerChannelName(String schedulerChannelName) { - this.schedulerChannelName = schedulerChannelName; - } - - public void setSchedulerQueueName(String scheduledQueueName) { - this.schedulerQueueName = scheduledQueueName; - } - @Override protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { int requestIndex = 0; @@ -72,29 +65,19 @@ public class ScheduledTasksService extends TasksService { request.getArgs()[requestIndex] = request.getId(); Long startTime = (Long)request.getArgs()[3]; - if (requestId != null) { - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - // check if executor service not in shutdown state and previous task exists - "if redis.call('exists', KEYS[2]) == 0 and redis.call('hexists', KEYS[5], ARGV[2]) == 1 then " - + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);" - + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" - + "redis.call('incr', KEYS[1]);" - // if new task added to queue head then publish its startTime - // to all scheduler workers - + "local v = redis.call('zrange', KEYS[3], 0, 0); " - + "if v[1] == ARGV[2] then " - + "redis.call('publish', KEYS[4], ARGV[1]); " - + "end " - + "return 1;" - + "end;" - + "return 0;", - Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName), - startTime, request.getId(), encode(request)); - } - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " + + "local retryInterval = redis.call('get', KEYS[6]); " + + "if retryInterval ~= false then " + + "local time = tonumber(ARGV[4]) + tonumber(retryInterval);" + + "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);" + + "elseif tonumber(ARGV[5]) > 0 then " + + "redis.call('set', KEYS[6], ARGV[5]);" + + "local time = tonumber(ARGV[4]) + tonumber(ARGV[5]);" + + "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);" + + "end; " + + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);" + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + "redis.call('incr', KEYS[1]);" @@ -107,31 +90,29 @@ public class ScheduledTasksService extends TasksService { + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName), - startTime, request.getId(), encode(request)); + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName), + startTime, request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval); } @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { - return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove from scheduler queue - "if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then " - + "redis.call('hdel', KEYS[6], ARGV[1]); " - + "if redis.call('decr', KEYS[3]) == 0 then " - + "redis.call('del', KEYS[3]);" - + "if redis.call('get', KEYS[4]) == ARGV[2] then " - + "redis.call('set', KEYS[4], ARGV[3]);" - + "redis.call('publish', KEYS[5], ARGV[3]);" - + "end;" - + "end;" + "if redis.call('exists', KEYS[3]) == 0 then " + "return 1;" + "end;" + + "local task = redis.call('hget', KEYS[6], ARGV[1]); " - // remove from executor queue - + "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " - + "redis.call('hdel', KEYS[6], ARGV[1]); " + + "redis.call('hdel', KEYS[6], ARGV[1]); " + + + "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); " + + "local removedScheduled = redis.call('zrem', KEYS[2], ARGV[1]); " + + "local removed = redis.call('lrem', KEYS[1], 1, ARGV[1]); " + + // remove from executor queue + + "if task ~= false and (removed > 0 or removedScheduled > 0) then " + "if redis.call('decr', KEYS[3]) == 0 then " - + "redis.call('del', KEYS[3]);" + + "redis.call('del', KEYS[3], KEYS[7]);" + "if redis.call('get', KEYS[4]) == ARGV[2] then " + "redis.call('set', KEYS[4], ARGV[3]);" + "redis.call('publish', KEYS[5], ARGV[3]);" @@ -139,17 +120,22 @@ public class ScheduledTasksService extends TasksService { + "end;" + "return 1;" + "end;" - // delete scheduled task - + "redis.call('hdel', KEYS[6], ARGV[1]); " + + "if task == false then " + + "return 1; " + + "end;" + "return 0;", - Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName), taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } @Override protected RequestId generateRequestId() { if (requestId == null) { - return super.generateRequestId(); + byte[] id = new byte[17]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + PlatformDependent.threadLocalRandom().nextBytes(id); + id[0] = 1; + return new RequestId(id); } return requestId; } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 0d3427918..e53396e8b 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -20,7 +20,9 @@ import java.util.Arrays; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import org.redisson.Redisson; import org.redisson.RedissonExecutorService; import org.redisson.RedissonShutdownException; import org.redisson.api.RFuture; @@ -28,6 +30,8 @@ import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; import org.redisson.misc.Injector; @@ -36,6 +40,10 @@ import org.redisson.remote.ResponseEntry; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; /** * Executor service runs Callable and Runnable tasks. @@ -60,6 +68,7 @@ public class TasksRunnerService implements RemoteExecutorService { private String tasksName; private String schedulerQueueName; private String schedulerChannelName; + private String tasksRetryIntervalName; private ConcurrentMap responses; public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap responses) { @@ -76,6 +85,10 @@ public class TasksRunnerService implements RemoteExecutorService { } } + public void setTasksRetryIntervalName(String tasksRetryInterval) { + this.tasksRetryIntervalName = tasksRetryInterval; + } + public void setSchedulerQueueName(String schedulerQueueName) { this.schedulerQueueName = schedulerQueueName; } @@ -105,7 +118,7 @@ public class TasksRunnerService implements RemoteExecutorService { long newStartTime = System.currentTimeMillis() + period; RFuture future = asyncScheduledServiceAtFixed(executorId, requestId).scheduleAtFixedRate(className, classBody, state, newStartTime, period, executorId, requestId); try { - executeRunnable(className, classBody, state, null); + executeRunnable(className, classBody, state, requestId); } catch (RuntimeException e) { // cancel task if it throws an exception future.cancel(true); @@ -118,7 +131,7 @@ public class TasksRunnerService implements RemoteExecutorService { Date nextStartDate = new CronExpression(cronExpression).getNextValidTimeAfter(new Date()); RFuture future = asyncScheduledServiceAtFixed(executorId, requestId).schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, executorId, requestId); try { - executeRunnable(className, classBody, state, null); + executeRunnable(className, classBody, state, requestId); } catch (RuntimeException e) { // cancel task if it throws an exception future.cancel(true); @@ -141,13 +154,14 @@ public class TasksRunnerService implements RemoteExecutorService { scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setRequestId(new RequestId(requestId)); + scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); return asyncScheduledServiceAtFixed; } @Override public void scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId) { - executeRunnable(className, classBody, state, null); + executeRunnable(className, classBody, state, requestId); long newStartTime = System.currentTimeMillis() + delay; asyncScheduledServiceAtFixed(executorId, requestId).scheduleWithFixedDelay(className, classBody, state, newStartTime, delay, executorId, requestId); } @@ -164,6 +178,8 @@ public class TasksRunnerService implements RemoteExecutorService { @Override public Object executeCallable(String className, byte[] classBody, byte[] state, String requestId) { + renewRetryTime(requestId); + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length); try { buf.writeBytes(state); @@ -187,6 +203,51 @@ public class TasksRunnerService implements RemoteExecutorService { } } + protected void scheduleRetryTimeRenewal(final String requestId) { + ((Redisson)redisson).getConnectionManager().newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + renewRetryTime(requestId); + } + }, 5, TimeUnit.SECONDS); + } + + protected void renewRetryTime(final String requestId) { + RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + // check if executor service not in shutdown state + "local name = ARGV[2];" + + "local scheduledName = ARGV[2];" + + "if string.sub(scheduledName, 1, 2) ~= 'ff' then " + + "scheduledName = 'ff' .. scheduledName; " + + "else " + + "name = string.sub(name, 3, string.len(name)); " + + "end;" + + "local retryInterval = redis.call('get', KEYS[4]);" + + + "if redis.call('exists', KEYS[1]) == 0 and retryInterval ~= false and redis.call('hexists', KEYS[5], name) == 1 then " + + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "redis.call('zadd', KEYS[2], startTime, scheduledName);" + + "local v = redis.call('zrange', KEYS[2], 0, 0); " + // if new task added to queue head then publish its startTime + // to all scheduler workers + + "if v[1] == ARGV[2] then " + + "redis.call('publish', KEYS[3], startTime); " + + "end;" + + "return 1; " + + "end;" + + "return 0;", + Arrays.asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), + System.currentTimeMillis(), requestId); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess() || future.get()) { + scheduleRetryTimeRenewal(requestId); + } + } + }); + } + @SuppressWarnings("unchecked") private T decode(ByteBuf buf) throws IOException { @@ -197,6 +258,10 @@ public class TasksRunnerService implements RemoteExecutorService { @Override public void executeRunnable(String className, byte[] classBody, byte[] state, String requestId) { + if (requestId != null && requestId.startsWith("00")) { + renewRetryTime(requestId); + } + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length); try { buf.writeBytes(state); @@ -227,36 +292,26 @@ public class TasksRunnerService implements RemoteExecutorService { * If scheduledRequestId is not null then * delete scheduled task * - * @param scheduledRequestId + * @param requestId */ - private void finish(String scheduledRequestId) { + private void finish(String requestId) { classLoader.clearCurrentClassLoader(); - if (scheduledRequestId != null) { - commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID, - "redis.call('hdel', KEYS[4], ARGV[3]); " + - "if redis.call('decr', KEYS[1]) == 0 then " - + "redis.call('del', KEYS[1]);" - + "if redis.call('get', KEYS[2]) == ARGV[1] then " - + "redis.call('set', KEYS[2], ARGV[2]);" - + "redis.call('publish', KEYS[3], ARGV[2]);" - + "end;" - + "end;", - Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName), - RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, scheduledRequestId); - return; - } - - commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID, - "if redis.call('decr', KEYS[1]) == 0 then " - + "redis.call('del', KEYS[1]);" + commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, + "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" + + "if scheduled == false then " + + "redis.call('hdel', KEYS[4], ARGV[3]); " + + "end;" + + "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" + + "if redis.call('decr', KEYS[1]) == 0 then " + + "redis.call('del', KEYS[1], KEYS[6]);" + "if redis.call('get', KEYS[2]) == ARGV[1] then " + "redis.call('set', KEYS[2], ARGV[2]);" + "redis.call('publish', KEYS[3], ARGV[2]);" + "end;" - + "end;", - Arrays.asList(tasksCounterName, statusName, terminationTopicName), - RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + + "end;", + Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName), + RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId); } } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 5572e4688..cd32ecf4f 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -26,7 +26,9 @@ import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CompositeCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -50,11 +52,23 @@ public class TasksService extends BaseRemoteService { protected String tasksCounterName; protected String statusName; protected String tasksName; + protected String schedulerQueueName; + protected String schedulerChannelName; + protected String tasksRetryIntervalName; + protected long tasksRetryInterval; public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, executorId, responses); } + public void setTasksRetryIntervalName(String tasksRetryIntervalName) { + this.tasksRetryIntervalName = tasksRetryIntervalName; + } + + public void setTasksRetryInterval(long tasksRetryInterval) { + this.tasksRetryInterval = tasksRetryInterval; + } + public void setTerminationTopicName(String terminationTopicName) { this.terminationTopicName = terminationTopicName; } @@ -70,6 +84,14 @@ public class TasksService extends BaseRemoteService { public void setTasksName(String tasksName) { this.tasksName = tasksName; } + + public void setSchedulerChannelName(String schedulerChannelName) { + this.schedulerChannelName = schedulerChannelName; + } + + public void setSchedulerQueueName(String scheduledQueueName) { + this.schedulerQueueName = scheduledQueueName; + } @Override protected final RFuture addAsync(String requestQueueName, @@ -104,49 +126,64 @@ public class TasksService extends BaseRemoteService { protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { request.getArgs()[3] = request.getId(); + long retryStartTime = 0; + if (tasksRetryInterval > 0) { + retryStartTime = System.currentTimeMillis() + tasksRetryInterval; + } - return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return getAddCommandExecutor().evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " - + "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);" - + "redis.call('rpush', KEYS[3], ARGV[1]); " + + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + + "redis.call('rpush', KEYS[6], ARGV[2]); " + "redis.call('incr', KEYS[1]);" + + + "if tonumber(ARGV[1]) > 0 then " + + "redis.call('set', KEYS[7], ARGV[4]);" + + "redis.call('zadd', KEYS[3], ARGV[1], 'ff' .. ARGV[2]);" + + "local v = redis.call('zrange', KEYS[3], 0, 0); " + // if new task added to queue head then publish its startTime + // to all scheduler workers + + "if v[1] == ARGV[2] then " + + "redis.call('publish', KEYS[4], ARGV[1]); " + + "end " + + "end;" + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, requestQueueName, tasksName), - request.getId(), encode(request)); + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName), + retryStartTime, request.getId(), encode(request), tasksRetryInterval); } @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local task = redis.call('hget', KEYS[5], ARGV[1]); " + - "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " - + "redis.call('hdel', KEYS[5], ARGV[1]); " - + "if redis.call('decr', KEYS[2]) == 0 then " - + "redis.call('del', KEYS[2]);" - + "if redis.call('get', KEYS[3]) == ARGV[2] then " - + "redis.call('set', KEYS[3], ARGV[3]);" - + "redis.call('publish', KEYS[4], ARGV[3]);" - + "end;" - + "end;" - + "return 1;" + "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); " + + "local task = redis.call('hget', KEYS[6], ARGV[1]); " + + "redis.call('hdel', KEYS[6], ARGV[1]); " + // remove from executor queue + + "if task ~= false and redis.call('exists', KEYS[3]) == 1 and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then " + + "if redis.call('decr', KEYS[3]) == 0 then " + + "redis.call('del', KEYS[3], KEYS[7]);" + + "if redis.call('get', KEYS[4]) == ARGV[2] then " + + "redis.call('set', KEYS[4], ARGV[3]);" + + "redis.call('publish', KEYS[5], ARGV[3]);" + + "end;" + "end;" - + "return 0;", - Arrays.asList(requestQueueName, tasksCounterName, statusName, terminationTopicName, tasksName), - taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + + "return 1;" + + "end;" + + "if task == false then " + + "return 1; " + + "end;" + + "return 0;", + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName), + taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } public RFuture cancelExecutionAsync(final RequestId requestId) { - final Class syncInterface = RemoteExecutorService.class; - - if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) { - return RedissonPromise.newSucceededFuture(false); - } - final RPromise result = new RedissonPromise(); - String requestQueueName = getRequestQueueName(syncInterface); + String requestQueueName = getRequestQueueName(RemoteExecutorService.class); RFuture removeFuture = removeAsync(requestQueueName, requestId); removeFuture.addListener(new FutureListener() { @Override @@ -159,7 +196,7 @@ public class TasksService extends BaseRemoteService { if (future.getNow()) { result.trySuccess(true); } else { - RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); + RMap canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec)); canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/misc/CountableListener.java b/redisson/src/main/java/org/redisson/misc/CountableListener.java index ade47d5ac..81a4695d9 100644 --- a/redisson/src/main/java/org/redisson/misc/CountableListener.java +++ b/redisson/src/main/java/org/redisson/misc/CountableListener.java @@ -36,19 +36,19 @@ public class CountableListener implements FutureListener { } public CountableListener(RPromise result, T value) { - super(); + this(null, null, 0); + } + + public CountableListener(RPromise result, T value, int count) { this.result = result; this.value = value; + this.counter.set(count); } public void setCounter(int newValue) { counter.set(newValue); } - public void incCounter() { - counter.incrementAndGet(); - } - public void decCounter() { if (counter.decrementAndGet() == 0) { onSuccess(value); diff --git a/redisson/src/main/java/org/redisson/remote/RequestId.java b/redisson/src/main/java/org/redisson/remote/RequestId.java index 8f8757dd9..5646e259e 100644 --- a/redisson/src/main/java/org/redisson/remote/RequestId.java +++ b/redisson/src/main/java/org/redisson/remote/RequestId.java @@ -15,9 +15,9 @@ */ package org.redisson.remote; -import io.netty.buffer.ByteBuf; +import java.util.Arrays; + import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; /** * @@ -26,41 +26,26 @@ import io.netty.buffer.Unpooled; */ public class RequestId { - private final long id0; - private final long id1; + private final byte[] id; public RequestId(String id) { this(ByteBufUtil.decodeHexDump(id)); } public RequestId(byte[] buf) { - ByteBuf b = Unpooled.wrappedBuffer(buf); - try { - id0 = b.readLong(); - id1 = b.readLong(); - } finally { - b.release(); - } + id = buf; } @Override public String toString() { - ByteBuf id = Unpooled.buffer(16); - try { - id.writeLong(id0); - id.writeLong(id1); - return ByteBufUtil.hexDump(id); - } finally { - id.release(); - } + return ByteBufUtil.hexDump(id); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + (int) (id0 ^ (id0 >>> 32)); - result = prime * result + (int) (id1 ^ (id1 >>> 32)); + result = prime * result + Arrays.hashCode(id); return result; } @@ -73,12 +58,9 @@ public class RequestId { if (getClass() != obj.getClass()) return false; RequestId other = (RequestId) obj; - if (id0 != other.id0) - return false; - if (id1 != other.id1) + if (!Arrays.equals(id, other.id)) return false; return true; } - } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 31b50c692..34b55a997 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -470,11 +470,9 @@ public class RedissonTransaction implements RTransaction { } final CountableListener> listener = - new CountableListener>(result, hashes); - listener.setCounter(hashes.size()); + new CountableListener>(result, hashes, hashes.size()); RPromise subscriptionFuture = new RedissonPromise(); - final CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null); - subscribedFutures.setCounter(hashes.size()); + final CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null, hashes.size()); final List> topics = new ArrayList>(); for (final Entry entry : hashes.entrySet()) { diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index bfa1e4d42..9e6ca368a 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -424,7 +424,7 @@ public class RedissonRemoteServiceTest extends BaseTest { r2.shutdown(); } } - + @Test public void testInvocations() { RedissonClient r1 = createInstance(); @@ -451,6 +451,7 @@ public class RedissonRemoteServiceTest extends BaseTest { assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); } + assertThat(r1.getKeys().count()).isZero(); r1.shutdown(); r2.shutdown(); } diff --git a/redisson/src/test/java/org/redisson/executor/FailoverTask.java b/redisson/src/test/java/org/redisson/executor/FailoverTask.java new file mode 100644 index 000000000..e196c632c --- /dev/null +++ b/redisson/src/test/java/org/redisson/executor/FailoverTask.java @@ -0,0 +1,27 @@ +package org.redisson.executor; + +import org.redisson.api.RedissonClient; +import org.redisson.api.annotation.RInject; + +public class FailoverTask implements Runnable { + + @RInject + private RedissonClient redisson; + private String objectName; + + public FailoverTask() { + } + + public FailoverTask(String objectName) { + super(); + this.objectName = objectName; + } + + @Override + public void run() { + for (long i = 0; i < 20_000_000_000L; i++) { + } + redisson.getBucket(objectName).set(true); + } + +} diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 8cf9b862e..5c0254228 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -14,6 +14,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Duration; import org.junit.After; @@ -21,12 +22,17 @@ import org.junit.Before; import org.junit.Test; import org.redisson.BaseTest; import org.redisson.RedissonNode; +import org.redisson.api.ExecutorOptions; import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; import org.redisson.api.RExecutorService; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; + public class RedissonExecutorServiceTest extends BaseTest { private static RedissonNode node; @@ -68,6 +74,9 @@ public class RedissonExecutorServiceTest extends BaseTest { future.get(5, TimeUnit.SECONDS); future.getTaskFutures().stream().forEach(x -> x.syncUninterruptibly()); + + redisson.getKeys().delete("myCounter"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -78,6 +87,9 @@ public class RedissonExecutorServiceTest extends BaseTest { future.get(5, TimeUnit.SECONDS); future.getTaskFutures().stream().forEach(x -> assertThat(x.getNow()).isEqualTo("1234")); + + redisson.getKeys().delete("myCounter"); + assertThat(redisson.getKeys().count()).isZero(); } @@ -86,6 +98,72 @@ public class RedissonExecutorServiceTest extends BaseTest { RExecutorService e = redisson.getExecutorService("test"); e.execute(); } + + @Test + public void testTaskFinishing() throws Exception { + AtomicInteger counter = new AtomicInteger(); + new MockUp() { + @Mock + private void finish(Invocation invocation, String requestId) { + if (counter.incrementAndGet() > 1) { + invocation.proceed(); + } + } + }; + + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + RExecutorService executor = redisson.getExecutorService("test2"); + RExecutorFuture f = executor.submit(new FailoverTask("finished")); + Thread.sleep(2000); + node.shutdown(); + + f.get(); + assertThat(redisson.getBucket("finished").get()).isTrue(); + } + + @Test + public void testTaskFailover() throws Exception { + AtomicInteger counter = new AtomicInteger(); + new MockUp() { + @Mock + private void finish(Invocation invocation, String requestId) { + if (counter.incrementAndGet() > 1) { + invocation.proceed(); + } + } + }; + + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + + RExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS)); + RExecutorFuture f = executor.submit(new IncrementRunnableTask("counter")); + f.get(); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1); + Thread.sleep(2000); + node.shutdown(); + + node = RedissonNode.create(nodeConfig); + node.start(); + + Thread.sleep(8500); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + Thread.sleep(16000); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + redisson.getKeys().delete("counter"); + assertThat(redisson.getKeys().count()).isEqualTo(1); + } @Test public void testBatchExecute() { @@ -94,6 +172,8 @@ public class RedissonExecutorServiceTest extends BaseTest { new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter")); await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4); + redisson.getKeys().delete("myCounter"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -103,11 +183,13 @@ public class RedissonExecutorServiceTest extends BaseTest { Thread.sleep(2000); cancel(future); assertThat(redisson.getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE); - RExecutorFuture futureAsync = executor.submitAsync(new ScheduledLongRunnableTask("executed2")); Thread.sleep(2000); assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue(); assertThat(redisson.getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE); + + redisson.getKeys().delete("executed1", "executed2"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -134,14 +216,13 @@ public class RedissonExecutorServiceTest extends BaseTest { for (Future future : allResult) { assertThat(future.get()).isEqualTo(CallableTask.RESULT); } - + List invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()); List> allResult1 = e.invokeAll(invokeAllParams1, 5, TimeUnit.SECONDS); assertThat(allResult1).hasSize(invokeAllParams.size()); for (Future future : allResult1) { assertThat(future.get()).isEqualTo(CallableTask.RESULT); } - } @Test(expected = RejectedExecutionException.class) @@ -158,6 +239,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.execute(new RunnableTask()); + + assertThat(redisson.getKeys().count()).isZero(); } @Test(expected = RejectedExecutionException.class) @@ -174,6 +257,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.submit(new RunnableTask2()); + + assertThat(redisson.getKeys().count()).isZero(); } @Test(expected = RejectedExecutionException.class) @@ -190,6 +275,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.submit(new CallableTask()); + + assertThat(redisson.getKeys().count()).isZero(); } @Test(expected = RejectedExecutionException.class) @@ -199,6 +286,8 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(e.isShutdown()).isTrue(); e.submit(new RunnableTask2()); + + assertThat(redisson.getKeys().count()).isZero(); } @@ -255,6 +344,9 @@ public class RedissonExecutorServiceTest extends BaseTest { s4.get(); assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L); + + redisson.getKeys().delete("runnableCounter", "counter"); + assertThat(redisson.getKeys().count()).isZero(); } @Test diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 36c920b65..81ce04869 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -10,6 +10,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Assert; @@ -18,11 +19,17 @@ import org.junit.Test; import org.redisson.BaseTest; import org.redisson.RedissonNode; import org.redisson.api.CronSchedule; +import org.redisson.api.ExecutorOptions; +import org.redisson.api.RExecutorFuture; import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledFuture; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; + public class RedissonScheduledExecutorServiceTest extends BaseTest { private static RedissonNode node; @@ -44,6 +51,44 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { super.after(); node.shutdown(); } + + @Test + public void testTaskFailover() throws Exception { + AtomicInteger counter = new AtomicInteger(); + new MockUp() { + @Mock + private void finish(Invocation invocation, String requestId) { + if (counter.incrementAndGet() > 1) { + invocation.proceed(); + } + } + }; + + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + RScheduledExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS)); + RExecutorFuture f = executor.schedule(new IncrementRunnableTask("counter"), 1, TimeUnit.SECONDS); + f.get(); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1); + Thread.sleep(2000); + node.shutdown(); + + node = RedissonNode.create(nodeConfig); + node.start(); + + Thread.sleep(8500); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + Thread.sleep(16000); + assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2); + + redisson.getKeys().delete("counter"); + assertThat(redisson.getKeys().count()).isEqualTo(1); + } @Test(timeout = 7000) public void testTaskResume() throws InterruptedException, ExecutionException { @@ -79,7 +124,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { } for (RScheduledFuture future : futures) { - assertThat(future.awaitUninterruptibly(5000)).isTrue(); + assertThat(future.awaitUninterruptibly(5100)).isTrue(); } node.shutdown(); @@ -111,6 +156,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { cancel(future1); Thread.sleep(2000); assertThat(redisson.getAtomicLong("executed1").isExists()).isFalse(); + + redisson.getKeys().delete("executed1"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -121,6 +169,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Thread.sleep(2000); assertThat(redisson.getAtomicLong("executed1").isExists()).isFalse(); assertThat(executor.delete()).isFalse(); + + redisson.getKeys().delete("executed1"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -130,11 +181,14 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Thread.sleep(2000); cancel(future); assertThat(redisson.getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE); - + RScheduledFuture futureAsync = executor.scheduleAsync(new ScheduledLongRunnableTask("executed2"), 1, TimeUnit.SECONDS); Thread.sleep(2000); assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue(); assertThat(redisson.getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE); + + redisson.getKeys().delete("executed1", "executed2"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -155,6 +209,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue(); Thread.sleep(3000); assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(2); + + redisson.getKeys().delete("executed1", "executed2"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -180,13 +237,16 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Thread.sleep(3000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); + + redisson.getKeys().delete("counter", "executed1", "executed2"); + assertThat(redisson.getKeys().count()).isZero(); } private void cancel(ScheduledFuture future1) throws InterruptedException, ExecutionException { assertThat(future1.cancel(true)).isTrue(); try { future1.get(); - Assert.fail("CancellationException should be arise"); + Assert.fail("CancellationException should arise"); } catch (CancellationException e) { // skip } @@ -204,6 +264,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Thread.sleep(3000); assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(5); + + redisson.getKeys().delete("executed1"); + assertThat(redisson.getKeys().count()).isZero(); } @@ -223,6 +286,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1); assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(1); assertThat(redisson.getAtomicLong("executed3").get()).isEqualTo(1); + + redisson.getKeys().delete("executed1", "executed2", "executed3"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -239,6 +305,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1); assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(1); assertThat(redisson.getAtomicLong("executed3").get()).isEqualTo(1); + + redisson.getKeys().delete("executed1", "executed2", "executed3"); + assertThat(redisson.getKeys().count()).isZero(); } @Test @@ -249,6 +318,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { future.get(); assertThat(System.currentTimeMillis() - startTime).isBetween(5000L, 5200L); assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(1); + + redisson.getKeys().delete("executed"); + assertThat(redisson.getKeys().count()).isZero(); } @Test diff --git a/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java b/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java index 1a598310b..1f98f14c0 100644 --- a/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java +++ b/redisson/src/test/java/org/redisson/executor/ScheduledLongRunnableTask.java @@ -27,5 +27,5 @@ public class ScheduledLongRunnableTask implements Runnable { } } } - + }