Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit c2f675d3be

@ -81,7 +81,7 @@
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.9.1</version>
<version>3.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
@ -90,6 +90,12 @@
<version>3.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.39</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@ -102,12 +108,6 @@
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.33</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>

@ -39,7 +39,9 @@ import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
@ -718,8 +720,8 @@ public abstract class BaseRemoteService {
return;
}
RMap<String, T> canceledRequests = redisson.getMap(mapName, codec);
RFuture<T> future = canceledRequests.getAsync(requestId.toString());
RMap<String, T> canceledRequests = redisson.getMap(mapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
future.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
@ -744,9 +746,10 @@ public abstract class BaseRemoteService {
}
protected RequestId generateRequestId() {
byte[] id = new byte[16];
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
id[0] = 0;
return new RequestId(id);
}
@ -757,7 +760,7 @@ public abstract class BaseRemoteService {
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
@ -375,7 +376,12 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name) {
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses);
return getExecutorService(name, connectionManager.getCodec());
}
@Override
public RScheduledExecutorService getExecutorService(String name, ExecutorOptions options) {
return getExecutorService(name, connectionManager.getCodec(), options);
}
@Override
@ -386,7 +392,12 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses);
return getExecutorService(name, codec, ExecutorOptions.defaults());
}
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options);
}
@Override

@ -41,6 +41,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
@ -60,6 +61,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonExecutorRemoteService;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
@ -92,6 +94,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class);
private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
public static final int SHUTDOWN_STATE = 1;
public static final int TERMINATED_STATE = 2;
@ -103,6 +107,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String tasksName;
private final String schedulerQueueName;
private final String schedulerChannelName;
private final String tasksRetryIntervalName;
private final String workersChannelName;
private final String workersSemaphoreName;
@ -121,6 +126,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
private final ScheduledTasksService scheduledRemoteService;
private final TasksService executorRemoteService;
private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
@ -134,7 +140,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<RExecutorFuture<?>>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(PlatformDependent.<RedissonExecutorFutureReference, Boolean>newConcurrentHashMap());
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses) {
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson,
String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, ExecutorOptions options) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
@ -150,7 +157,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = redisson.getRemoteService(name, codec);
remoteService = new RedissonExecutorRemoteService(codec, redisson, name, connectionManager.getCommandExecutor(), executorId, responses);
requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId);
String objectName = requestQueueName;
@ -159,7 +166,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
statusName = objectName + ":status";
terminationTopic = redisson.getTopic(objectName + ":termination-topic", codec);
tasksRetryIntervalName = objectName + ":retry-interval";
schedulerChannelName = objectName + ":scheduler-channel";
schedulerQueueName = objectName + ":scheduler";
@ -169,12 +176,16 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersTopic = redisson.getTopic(workersChannelName);
TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
executorRemoteService.setSchedulerChannelName(schedulerChannelName);
executorRemoteService.setSchedulerQueueName(schedulerQueueName);
executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
@ -184,7 +195,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
scheduledRemoteService.setSchedulerQueueName(schedulerQueueName);
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
}
@ -232,9 +245,32 @@ public class RedissonExecutorService implements RScheduledExecutorService {
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "local retryInterval = redis.call('get', KEYS[4]);"
+ "if #expiredTaskIds > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "if retryInterval ~= false then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "for i = 1, #expiredTaskIds, 1 do "
+ "local name = expiredTaskIds[i];"
+ "local scheduledName = expiredTaskIds[i];"
+ "if string.sub(scheduledName, 1, 2) ~= 'ff' then "
+ "scheduledName = 'ff' .. scheduledName; "
+ "else "
+ "name = string.sub(name, 3, string.len(name)); "
+ "end;"
+ "redis.call('zadd', KEYS[2], startTime, scheduledName);"
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == expiredTaskIds[i] then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"
+ "redis.call('rpush', KEYS[1], name);"
+ "end; "
+ "else "
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "end; "
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
@ -242,7 +278,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName),
System.currentTimeMillis(), 100);
}
};
@ -256,6 +292,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
service.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
service.setSchedulerChannelName(schedulerChannelName);
service.setSchedulerQueueName(schedulerQueueName);
service.setTasksRetryIntervalName(tasksRetryIntervalName);
remoteService.register(RemoteExecutorService.class, service, workers, executor);
workersGroupListenerId = workersTopic.addListener(new MessageListener<String>() {
@ -267,6 +304,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
});
}
private long repeatInterval = 5000;
@Override
public void execute(Runnable task) {
check(task);
@ -303,6 +342,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
executorRemoteService.setSchedulerChannelName(schedulerChannelName);
executorRemoteService.setSchedulerQueueName(schedulerQueueName);
executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
return executorRemoteService;
}
@ -361,7 +403,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "redis.call('set', KEYS[2], ARGV[1]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0)),
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopic.getChannelNames().get(0), tasksRetryIntervalName),
SHUTDOWN_STATE, TERMINATED_STATE);
}
@ -471,7 +513,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
@ -496,7 +538,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Callable<?> task : tasks) {
check(task);
@ -599,7 +641,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
@ -624,7 +666,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Runnable task : tasks) {
check(task);
@ -843,8 +885,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public boolean cancelTask(String taskId) {
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
if (taskId.startsWith("01")) {
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
return commandExecutor.get(scheduledFuture);
}
RFuture<Boolean> scheduledFuture = executorRemoteService.cancelExecutionAsync(new RequestId(taskId));
return commandExecutor.get(scheduledFuture);
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
@ -908,8 +955,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw ee;
} finally {
for (Future<T> f : futures)
for (Future<T> f : futures) {
f.cancel(true);
}
}
}

@ -19,6 +19,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
@ -116,7 +117,7 @@ public class RedissonNode {
*/
public void shutdown() {
if (hasRedissonInstance) {
redisson.shutdown();
redisson.shutdown(0, 15, TimeUnit.MINUTES);
log.info("Redisson node has been shutdown successfully");
}
}

@ -187,7 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
final String requestId = future.getNow();
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<RemoteServiceRequest> taskFuture = tasks.getAsync(requestId);
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
@ -220,10 +220,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return;
}
final String responseName = getResponseQueueName(request.getExecutorId());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
final String responseName = getResponseQueueName(request.getExecutorId());
String ackName = getAckName(request.getId());
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -330,7 +330,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// could be removed not from future object
if (future.getNow().isSendResponse()) {
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, codec);
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
map.putAsync(request.getId(), response);
map.expireAsync(60, TimeUnit.SECONDS);
}
@ -397,4 +397,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
protected RFuture<RemoteServiceRequest> getTask(final String requestId, RMap<String, RemoteServiceRequest> tasks) {
return tasks.removeAsync(requestId);
}
}

@ -0,0 +1,56 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Configuration for ExecutorService.
*
* @author Nikita Koksharov
*
*/
public class ExecutorOptions {
private long taskRetryInterval = 60000;
private ExecutorOptions() {
}
public static ExecutorOptions defaults() {
return new ExecutorOptions();
}
public long getTaskRetryInterval() {
return taskRetryInterval;
}
/**
* Defines task retry interval at the end of which task is executed again.
* ExecutorService worker re-schedule task execution retry every 5 seconds.
* <p>
* Default is <code>1 minute</code>
*
* @param timeout value
* @param unit value
* @return self instance
*/
public ExecutorOptions taskRetryInterval(long timeout, TimeUnit unit) {
this.taskRetryInterval = unit.toMillis(timeout);
return this;
}
}

@ -839,6 +839,15 @@ public interface RedissonClient {
*/
RScheduledExecutorService getExecutorService(String name);
/**
* Returns ScheduledExecutorService by name
*
* @param name - name of object
* @param options - options for executor
* @return ScheduledExecutorService object
*/
RScheduledExecutorService getExecutorService(String name, ExecutorOptions options);
/**
* Returns ScheduledExecutorService by name
* using provided codec for task, response and request serialization
@ -864,6 +873,17 @@ public interface RedissonClient {
* @since 2.8.2
*/
RScheduledExecutorService getExecutorService(String name, Codec codec);
/**
* Returns ScheduledExecutorService by name
* using provided codec for task, response and request serialization
*
* @param name - name of object
* @param codec - codec for task, response and request
* @param options - options for executor
* @return ScheduledExecutorService object
*/
RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)

@ -582,7 +582,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeEntry(slot);
if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync();
entry.shutdownAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr());
}
}

@ -1,75 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.deser.ValueInstantiator;
import com.fasterxml.jackson.databind.deser.ValueInstantiators.Base;
import com.fasterxml.jackson.databind.module.SimpleModule;
/**
* Fix for https://github.com/FasterXML/jackson-databind/issues/1599
*
* @author Nikita Koksharov
*
* TODO remove after update to latest version of Jackson
*
*/
public class DefenceModule extends SimpleModule {
private static final long serialVersionUID = -429891510707420220L;
public static class DefenceValueInstantiator extends Base {
protected final static Set<String> DEFAULT_NO_DESER_CLASS_NAMES;
static {
Set<String> s = new HashSet<String>();
// Courtesy of [https://github.com/kantega/notsoserial]:
// (and wrt [databind#1599]
s.add("org.apache.commons.collections.functors.InvokerTransformer");
s.add("org.apache.commons.collections.functors.InstantiateTransformer");
s.add("org.apache.commons.collections4.functors.InvokerTransformer");
s.add("org.apache.commons.collections4.functors.InstantiateTransformer");
s.add("org.codehaus.groovy.runtime.ConvertedClosure");
s.add("org.codehaus.groovy.runtime.MethodClosure");
s.add("org.springframework.beans.factory.ObjectFactory");
s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl");
DEFAULT_NO_DESER_CLASS_NAMES = Collections.unmodifiableSet(s);
}
@Override
public ValueInstantiator findValueInstantiator(DeserializationConfig config, BeanDescription beanDesc,
ValueInstantiator defaultInstantiator) {
if (DEFAULT_NO_DESER_CLASS_NAMES.contains(beanDesc.getClassInfo().getRawType().getName())) {
throw new IllegalArgumentException("Illegal type " + beanDesc.getClassInfo().getRawType().getName() + " to deserialize: prevented for security reasons");
}
return super.findValueInstantiator(config, beanDesc, defaultInstantiator);
}
}
@Override
public void setupModule(SetupContext context) {
context.addValueInstantiators(new DefenceValueInstantiator());
}
}

@ -153,8 +153,6 @@ public class JsonJacksonCodec extends BaseCodec {
}
protected void init(ObjectMapper objectMapper) {
objectMapper.registerModule(new DefenceModule());
objectMapper.setSerializationInclusion(Include.NON_NULL);
objectMapper.setVisibility(objectMapper.getSerializationConfig()
.getDefaultVisibilityChecker()

@ -67,8 +67,6 @@ public interface ConnectionManager {
IdleConnectionWatcher getConnectionWatcher();
void shutdownAsync(RedisClient client);
int calcSlot(String key);
MasterSlaveServersConfig getConfig();

@ -46,11 +46,11 @@ import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.TransportMode;
import org.redisson.misc.CountableListener;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -435,11 +435,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return client;
}
@Override
public void shutdownAsync(RedisClient client) {
client.shutdownAsync();
}
@Override
public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
@ -633,16 +628,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
dnsMonitor.stop();
}
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly();
for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdown();
}
if (cfg.getExecutor() == null) {
executor.shutdown();
try {
@ -651,7 +636,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
Thread.currentThread().interrupt();
}
}
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly();
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, getEntrySet().size());
for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdownAsync().addListener(listener);
}
result.awaitUninterruptibly(timeout, unit);
resolverGroup.close();
if (cfg.getEventLoopGroup() == null) {

@ -139,25 +139,28 @@ public class MasterSlaveEntry {
return;
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
listener.incCounter();
pubSubFuture.addListener(listener);
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
int counter = 1;
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
counter++;
}
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client, counter);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
pubSubFuture.addListener(listener);
}
}
});
@ -465,19 +468,22 @@ public class MasterSlaveEntry {
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
oldMaster.getClient().shutdownAsync();
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr());
}
});
}
public void shutdownMasterAsync() {
public RFuture<Void> shutdownAsync() {
if (!active.compareAndSet(true, false)) {
return;
return RedissonPromise.<Void>newSucceededFuture(null);
}
connectionManager.shutdownAsync(masterEntry.getClient());
slaveBalancer.shutdownAsync();
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2);
masterEntry.getClient().shutdownAsync().addListener(listener);
slaveBalancer.shutdownAsync().addListener(listener);
return result;
}
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
@ -526,15 +532,6 @@ public class MasterSlaveEntry {
slaveBalancer.returnConnection(connection);
}
public void shutdown() {
if (!active.compareAndSet(true, false)) {
return;
}
masterEntry.getClient().shutdown();
slaveBalancer.shutdown();
}
public void addSlotRange(Integer range) {
slots.add(range);
}

@ -83,19 +83,17 @@ public class LoadBalancerManager {
public RFuture<Void> add(final ClientConnectionsEntry entry) {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null) {
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2) {
@Override
protected void onSuccess(Void value) {
client2Entry.put(entry.getClient(), entry);
}
client2Entry.put(entry.getClient(), entry);
}
};
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
listener.incCounter();
slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
listener.incCounter();
pubSubFuture.addListener(listener);
return result;
}
@ -249,16 +247,16 @@ public class LoadBalancerManager {
slaveConnectionPool.returnConnection(entry, connection);
}
public void shutdown() {
for (ClientConnectionsEntry entry : client2Entry.values()) {
entry.getClient().shutdown();
public RFuture<Void> shutdownAsync() {
if (client2Entry.values().isEmpty()) {
return RedissonPromise.<Void>newSucceededFuture(null);
}
}
public void shutdownAsync() {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, client2Entry.values().size());
for (ClientConnectionsEntry entry : client2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
entry.getClient().shutdownAsync().addListener(listener);
}
return result;
}
}

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonExecutorRemoteService extends RedissonRemoteService {
public RedissonExecutorRemoteService(Codec codec, RedissonClient redisson, String name,
CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
}
@Override
protected RFuture<RemoteServiceRequest> getTask(String requestId, RMap<String, RemoteServiceRequest> tasks) {
return tasks.getAsync(requestId);
}
}

@ -23,12 +23,15 @@ import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -37,8 +40,6 @@ import org.redisson.remote.ResponseEntry;
public class ScheduledTasksService extends TasksService {
private RequestId requestId;
private String schedulerQueueName;
private String schedulerChannelName;
public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, redissonId, responses);
@ -48,14 +49,6 @@ public class ScheduledTasksService extends TasksService {
this.requestId = requestId;
}
public void setSchedulerChannelName(String schedulerChannelName) {
this.schedulerChannelName = schedulerChannelName;
}
public void setSchedulerQueueName(String scheduledQueueName) {
this.schedulerQueueName = scheduledQueueName;
}
@Override
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
int requestIndex = 0;
@ -72,29 +65,19 @@ public class ScheduledTasksService extends TasksService {
request.getArgs()[requestIndex] = request.getId();
Long startTime = (Long)request.getArgs()[3];
if (requestId != null) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state and previous task exists
"if redis.call('exists', KEYS[2]) == 0 and redis.call('hexists', KEYS[5], ARGV[2]) == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);"
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('incr', KEYS[1]);"
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[3], 0, 0); "
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName),
startTime, request.getId(), encode(request));
}
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "
+ "local retryInterval = redis.call('get', KEYS[6]); "
+ "if retryInterval ~= false then "
+ "local time = tonumber(ARGV[4]) + tonumber(retryInterval);"
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "elseif tonumber(ARGV[5]) > 0 then "
+ "redis.call('set', KEYS[6], ARGV[5]);"
+ "local time = tonumber(ARGV[4]) + tonumber(ARGV[5]);"
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "end; "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);"
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('incr', KEYS[1]);"
@ -107,31 +90,29 @@ public class ScheduledTasksService extends TasksService {
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName),
startTime, request.getId(), encode(request));
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName),
startTime, request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval);
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove from scheduler queue
"if redis.call('zrem', KEYS[2], ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
+ "end;"
+ "end;"
"if redis.call('exists', KEYS[3]) == 0 then "
+ "return 1;"
+ "end;"
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
// remove from executor queue
+ "if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); "
+ "local removedScheduled = redis.call('zrem', KEYS[2], ARGV[1]); "
+ "local removed = redis.call('lrem', KEYS[1], 1, ARGV[1]); "
// remove from executor queue
+ "if task ~= false and (removed > 0 or removedScheduled > 0) then "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);"
+ "redis.call('del', KEYS[3], KEYS[7]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
@ -139,17 +120,22 @@ public class ScheduledTasksService extends TasksService {
+ "end;"
+ "return 1;"
+ "end;"
// delete scheduled task
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "if task == false then "
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
@Override
protected RequestId generateRequestId() {
if (requestId == null) {
return super.generateRequestId();
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
id[0] = 1;
return new RequestId(id);
}
return requestId;
}

@ -20,7 +20,9 @@ import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.Redisson;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
@ -28,6 +30,8 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Injector;
@ -36,6 +40,10 @@ import org.redisson.remote.ResponseEntry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* Executor service runs Callable and Runnable tasks.
@ -60,6 +68,7 @@ public class TasksRunnerService implements RemoteExecutorService {
private String tasksName;
private String schedulerQueueName;
private String schedulerChannelName;
private String tasksRetryIntervalName;
private ConcurrentMap<String, ResponseEntry> responses;
public TasksRunnerService(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, ConcurrentMap<String, ResponseEntry> responses) {
@ -76,6 +85,10 @@ public class TasksRunnerService implements RemoteExecutorService {
}
}
public void setTasksRetryIntervalName(String tasksRetryInterval) {
this.tasksRetryIntervalName = tasksRetryInterval;
}
public void setSchedulerQueueName(String schedulerQueueName) {
this.schedulerQueueName = schedulerQueueName;
}
@ -105,7 +118,7 @@ public class TasksRunnerService implements RemoteExecutorService {
long newStartTime = System.currentTimeMillis() + period;
RFuture<Void> future = asyncScheduledServiceAtFixed(executorId, requestId).scheduleAtFixedRate(className, classBody, state, newStartTime, period, executorId, requestId);
try {
executeRunnable(className, classBody, state, null);
executeRunnable(className, classBody, state, requestId);
} catch (RuntimeException e) {
// cancel task if it throws an exception
future.cancel(true);
@ -118,7 +131,7 @@ public class TasksRunnerService implements RemoteExecutorService {
Date nextStartDate = new CronExpression(cronExpression).getNextValidTimeAfter(new Date());
RFuture<Void> future = asyncScheduledServiceAtFixed(executorId, requestId).schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, executorId, requestId);
try {
executeRunnable(className, classBody, state, null);
executeRunnable(className, classBody, state, requestId);
} catch (RuntimeException e) {
// cancel task if it throws an exception
future.cancel(true);
@ -141,13 +154,14 @@ public class TasksRunnerService implements RemoteExecutorService {
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setRequestId(new RequestId(requestId));
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
return asyncScheduledServiceAtFixed;
}
@Override
public void scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId) {
executeRunnable(className, classBody, state, null);
executeRunnable(className, classBody, state, requestId);
long newStartTime = System.currentTimeMillis() + delay;
asyncScheduledServiceAtFixed(executorId, requestId).scheduleWithFixedDelay(className, classBody, state, newStartTime, delay, executorId, requestId);
}
@ -164,6 +178,8 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public Object executeCallable(String className, byte[] classBody, byte[] state, String requestId) {
renewRetryTime(requestId);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
try {
buf.writeBytes(state);
@ -187,6 +203,51 @@ public class TasksRunnerService implements RemoteExecutorService {
}
}
protected void scheduleRetryTimeRenewal(final String requestId) {
((Redisson)redisson).getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
renewRetryTime(requestId);
}
}, 5, TimeUnit.SECONDS);
}
protected void renewRetryTime(final String requestId) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"local name = ARGV[2];"
+ "local scheduledName = ARGV[2];"
+ "if string.sub(scheduledName, 1, 2) ~= 'ff' then "
+ "scheduledName = 'ff' .. scheduledName; "
+ "else "
+ "name = string.sub(name, 3, string.len(name)); "
+ "end;"
+ "local retryInterval = redis.call('get', KEYS[4]);"
+ "if redis.call('exists', KEYS[1]) == 0 and retryInterval ~= false and redis.call('hexists', KEYS[5], name) == 1 then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "redis.call('zadd', KEYS[2], startTime, scheduledName);"
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName),
System.currentTimeMillis(), requestId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess() || future.get()) {
scheduleRetryTimeRenewal(requestId);
}
}
});
}
@SuppressWarnings("unchecked")
private <T> T decode(ByteBuf buf) throws IOException {
@ -197,6 +258,10 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public void executeRunnable(String className, byte[] classBody, byte[] state, String requestId) {
if (requestId != null && requestId.startsWith("00")) {
renewRetryTime(requestId);
}
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
try {
buf.writeBytes(state);
@ -227,36 +292,26 @@ public class TasksRunnerService implements RemoteExecutorService {
* If <code>scheduledRequestId</code> is not null then
* delete scheduled task
*
* @param scheduledRequestId
* @param requestId
*/
private void finish(String scheduledRequestId) {
private void finish(String requestId) {
classLoader.clearCurrentClassLoader();
if (scheduledRequestId != null) {
commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID,
"redis.call('hdel', KEYS[4], ARGV[3]); " +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, scheduledRequestId);
return;
}
commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID,
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "end;" +
"redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1], KEYS[6]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
+ "end;",
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);
}
}

@ -26,7 +26,9 @@ import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -50,11 +52,23 @@ public class TasksService extends BaseRemoteService {
protected String tasksCounterName;
protected String statusName;
protected String tasksName;
protected String schedulerQueueName;
protected String schedulerChannelName;
protected String tasksRetryIntervalName;
protected long tasksRetryInterval;
public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
}
public void setTasksRetryIntervalName(String tasksRetryIntervalName) {
this.tasksRetryIntervalName = tasksRetryIntervalName;
}
public void setTasksRetryInterval(long tasksRetryInterval) {
this.tasksRetryInterval = tasksRetryInterval;
}
public void setTerminationTopicName(String terminationTopicName) {
this.terminationTopicName = terminationTopicName;
}
@ -70,6 +84,14 @@ public class TasksService extends BaseRemoteService {
public void setTasksName(String tasksName) {
this.tasksName = tasksName;
}
public void setSchedulerChannelName(String schedulerChannelName) {
this.schedulerChannelName = schedulerChannelName;
}
public void setSchedulerQueueName(String scheduledQueueName) {
this.schedulerQueueName = scheduledQueueName;
}
@Override
protected final RFuture<Boolean> addAsync(String requestQueueName,
@ -104,49 +126,64 @@ public class TasksService extends BaseRemoteService {
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
request.getArgs()[3] = request.getId();
long retryStartTime = 0;
if (tasksRetryInterval > 0) {
retryStartTime = System.currentTimeMillis() + tasksRetryInterval;
}
return getAddCommandExecutor().evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return getAddCommandExecutor().evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('hset', KEYS[4], ARGV[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[3], ARGV[1]); "
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('rpush', KEYS[6], ARGV[2]); "
+ "redis.call('incr', KEYS[1]);"
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('set', KEYS[7], ARGV[4]);"
+ "redis.call('zadd', KEYS[3], ARGV[1], 'ff' .. ARGV[2]);"
+ "local v = redis.call('zrange', KEYS[3], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end "
+ "end;"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, requestQueueName, tasksName),
request.getId(), encode(request));
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName),
retryStartTime, request.getId(), encode(request), tasksRetryInterval);
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local task = redis.call('hget', KEYS[5], ARGV[1]); " +
"if task ~= false and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[5], ARGV[1]); "
+ "if redis.call('decr', KEYS[2]) == 0 then "
+ "redis.call('del', KEYS[2]);"
+ "if redis.call('get', KEYS[3]) == ARGV[2] then "
+ "redis.call('set', KEYS[3], ARGV[3]);"
+ "redis.call('publish', KEYS[4], ARGV[3]);"
+ "end;"
+ "end;"
+ "return 1;"
"redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); "
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
// remove from executor queue
+ "if task ~= false and redis.call('exists', KEYS[3]) == 1 and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3], KEYS[7]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
+ "end;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, tasksCounterName, statusName, terminationTopicName, tasksName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
+ "return 1;"
+ "end;"
+ "if task == false then "
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
public RFuture<Boolean> cancelExecutionAsync(final RequestId requestId) {
final Class<?> syncInterface = RemoteExecutorService.class;
if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) {
return RedissonPromise.newSucceededFuture(false);
}
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
String requestQueueName = getRequestQueueName(syncInterface);
String requestQueueName = getRequestQueueName(RemoteExecutorService.class);
RFuture<Boolean> removeFuture = removeAsync(requestQueueName, requestId);
removeFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -159,7 +196,7 @@ public class TasksService extends BaseRemoteService {
if (future.getNow()) {
result.trySuccess(true);
} else {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -36,19 +36,19 @@ public class CountableListener<T> implements FutureListener<Object> {
}
public CountableListener(RPromise<T> result, T value) {
super();
this(null, null, 0);
}
public CountableListener(RPromise<T> result, T value, int count) {
this.result = result;
this.value = value;
this.counter.set(count);
}
public void setCounter(int newValue) {
counter.set(newValue);
}
public void incCounter() {
counter.incrementAndGet();
}
public void decCounter() {
if (counter.decrementAndGet() == 0) {
onSuccess(value);

@ -15,9 +15,9 @@
*/
package org.redisson.remote;
import io.netty.buffer.ByteBuf;
import java.util.Arrays;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
*
@ -26,41 +26,26 @@ import io.netty.buffer.Unpooled;
*/
public class RequestId {
private final long id0;
private final long id1;
private final byte[] id;
public RequestId(String id) {
this(ByteBufUtil.decodeHexDump(id));
}
public RequestId(byte[] buf) {
ByteBuf b = Unpooled.wrappedBuffer(buf);
try {
id0 = b.readLong();
id1 = b.readLong();
} finally {
b.release();
}
id = buf;
}
@Override
public String toString() {
ByteBuf id = Unpooled.buffer(16);
try {
id.writeLong(id0);
id.writeLong(id1);
return ByteBufUtil.hexDump(id);
} finally {
id.release();
}
return ByteBufUtil.hexDump(id);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id0 ^ (id0 >>> 32));
result = prime * result + (int) (id1 ^ (id1 >>> 32));
result = prime * result + Arrays.hashCode(id);
return result;
}
@ -73,12 +58,9 @@ public class RequestId {
if (getClass() != obj.getClass())
return false;
RequestId other = (RequestId) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)
if (!Arrays.equals(id, other.id))
return false;
return true;
}
}

@ -470,11 +470,9 @@ public class RedissonTransaction implements RTransaction {
}
final CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<Map<HashKey, HashValue>>(result, hashes);
listener.setCounter(hashes.size());
new CountableListener<Map<HashKey, HashValue>>(result, hashes, hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<Void>();
final CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null);
subscribedFutures.setCounter(hashes.size());
final CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null, hashes.size());
final List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {

@ -424,7 +424,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
r2.shutdown();
}
}
@Test
public void testInvocations() {
RedissonClient r1 = createInstance();
@ -451,6 +451,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
assertThat(r1.getKeys().count()).isZero();
r1.shutdown();
r2.shutdown();
}

@ -0,0 +1,27 @@
package org.redisson.executor;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
public class FailoverTask implements Runnable {
@RInject
private RedissonClient redisson;
private String objectName;
public FailoverTask() {
}
public FailoverTask(String objectName) {
super();
this.objectName = objectName;
}
@Override
public void run() {
for (long i = 0; i < 20_000_000_000L; i++) {
}
redisson.getBucket(objectName).set(true);
}
}

@ -14,6 +14,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Duration;
import org.junit.After;
@ -21,12 +22,17 @@ import org.junit.Before;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedissonNode;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RExecutorService;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
public class RedissonExecutorServiceTest extends BaseTest {
private static RedissonNode node;
@ -68,6 +74,9 @@ public class RedissonExecutorServiceTest extends BaseTest {
future.get(5, TimeUnit.SECONDS);
future.getTaskFutures().stream().forEach(x -> x.syncUninterruptibly());
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -78,6 +87,9 @@ public class RedissonExecutorServiceTest extends BaseTest {
future.get(5, TimeUnit.SECONDS);
future.getTaskFutures().stream().forEach(x -> assertThat(x.getNow()).isEqualTo("1234"));
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();
}
@ -86,6 +98,72 @@ public class RedissonExecutorServiceTest extends BaseTest {
RExecutorService e = redisson.getExecutorService("test");
e.execute();
}
@Test
public void testTaskFinishing() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node = RedissonNode.create(nodeConfig);
node.start();
RExecutorService executor = redisson.getExecutorService("test2");
RExecutorFuture<?> f = executor.submit(new FailoverTask("finished"));
Thread.sleep(2000);
node.shutdown();
f.get();
assertThat(redisson.<Boolean>getBucket("finished").get()).isTrue();
}
@Test
public void testTaskFailover() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node = RedissonNode.create(nodeConfig);
node.start();
RExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
RExecutorFuture<?> f = executor.submit(new IncrementRunnableTask("counter"));
f.get();
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
Thread.sleep(8500);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
Thread.sleep(16000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
redisson.getKeys().delete("counter");
assertThat(redisson.getKeys().count()).isEqualTo(1);
}
@Test
public void testBatchExecute() {
@ -94,6 +172,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
new IncrementRunnableTask("myCounter"), new IncrementRunnableTask("myCounter"));
await().atMost(Duration.FIVE_SECONDS).until(() -> redisson.getAtomicLong("myCounter").get() == 4);
redisson.getKeys().delete("myCounter");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -103,11 +183,13 @@ public class RedissonExecutorServiceTest extends BaseTest {
Thread.sleep(2000);
cancel(future);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
RExecutorFuture<?> futureAsync = executor.submitAsync(new ScheduledLongRunnableTask("executed2"));
Thread.sleep(2000);
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -134,14 +216,13 @@ public class RedissonExecutorServiceTest extends BaseTest {
for (Future<String> future : allResult) {
assertThat(future.get()).isEqualTo(CallableTask.RESULT);
}
List<CallableTask> invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask());
List<Future<String>> allResult1 = e.invokeAll(invokeAllParams1, 5, TimeUnit.SECONDS);
assertThat(allResult1).hasSize(invokeAllParams.size());
for (Future<String> future : allResult1) {
assertThat(future.get()).isEqualTo(CallableTask.RESULT);
}
}
@Test(expected = RejectedExecutionException.class)
@ -158,6 +239,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.execute(new RunnableTask());
assertThat(redisson.getKeys().count()).isZero();
}
@Test(expected = RejectedExecutionException.class)
@ -174,6 +257,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.submit(new RunnableTask2());
assertThat(redisson.getKeys().count()).isZero();
}
@Test(expected = RejectedExecutionException.class)
@ -190,6 +275,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.submit(new CallableTask());
assertThat(redisson.getKeys().count()).isZero();
}
@Test(expected = RejectedExecutionException.class)
@ -199,6 +286,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(e.isShutdown()).isTrue();
e.submit(new RunnableTask2());
assertThat(redisson.getKeys().count()).isZero();
}
@ -255,6 +344,9 @@ public class RedissonExecutorServiceTest extends BaseTest {
s4.get();
assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L);
redisson.getKeys().delete("runnableCounter", "counter");
assertThat(redisson.getKeys().count()).isZero();
}
@Test

@ -10,6 +10,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
@ -18,11 +19,17 @@ import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.RedissonNode;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
public class RedissonScheduledExecutorServiceTest extends BaseTest {
private static RedissonNode node;
@ -44,6 +51,44 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
super.after();
node.shutdown();
}
@Test
public void testTaskFailover() throws Exception {
AtomicInteger counter = new AtomicInteger();
new MockUp<TasksRunnerService>() {
@Mock
private void finish(Invocation invocation, String requestId) {
if (counter.incrementAndGet() > 1) {
invocation.proceed();
}
}
};
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node = RedissonNode.create(nodeConfig);
node.start();
RScheduledExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
RExecutorFuture<?> f = executor.schedule(new IncrementRunnableTask("counter"), 1, TimeUnit.SECONDS);
f.get();
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
Thread.sleep(8500);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
Thread.sleep(16000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
redisson.getKeys().delete("counter");
assertThat(redisson.getKeys().count()).isEqualTo(1);
}
@Test(timeout = 7000)
public void testTaskResume() throws InterruptedException, ExecutionException {
@ -79,7 +124,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
}
for (RScheduledFuture<?> future : futures) {
assertThat(future.awaitUninterruptibly(5000)).isTrue();
assertThat(future.awaitUninterruptibly(5100)).isTrue();
}
node.shutdown();
@ -111,6 +156,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
cancel(future1);
Thread.sleep(2000);
assertThat(redisson.getAtomicLong("executed1").isExists()).isFalse();
redisson.getKeys().delete("executed1");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -121,6 +169,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(2000);
assertThat(redisson.getAtomicLong("executed1").isExists()).isFalse();
assertThat(executor.delete()).isFalse();
redisson.getKeys().delete("executed1");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -130,11 +181,14 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(2000);
cancel(future);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
RScheduledFuture<?> futureAsync = executor.scheduleAsync(new ScheduledLongRunnableTask("executed2"), 1, TimeUnit.SECONDS);
Thread.sleep(2000);
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -155,6 +209,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(2);
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -180,13 +237,16 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
redisson.getKeys().delete("counter", "executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
private void cancel(ScheduledFuture<?> future1) throws InterruptedException, ExecutionException {
assertThat(future1.cancel(true)).isTrue();
try {
future1.get();
Assert.fail("CancellationException should be arise");
Assert.fail("CancellationException should arise");
} catch (CancellationException e) {
// skip
}
@ -204,6 +264,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(5);
redisson.getKeys().delete("executed1");
assertThat(redisson.getKeys().count()).isZero();
}
@ -223,6 +286,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(1);
assertThat(redisson.getAtomicLong("executed3").get()).isEqualTo(1);
redisson.getKeys().delete("executed1", "executed2", "executed3");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -239,6 +305,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(1);
assertThat(redisson.getAtomicLong("executed3").get()).isEqualTo(1);
redisson.getKeys().delete("executed1", "executed2", "executed3");
assertThat(redisson.getKeys().count()).isZero();
}
@Test
@ -249,6 +318,9 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
future.get();
assertThat(System.currentTimeMillis() - startTime).isBetween(5000L, 5200L);
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(1);
redisson.getKeys().delete("executed");
assertThat(redisson.getKeys().count()).isZero();
}
@Test

@ -27,5 +27,5 @@ public class ScheduledLongRunnableTask implements Runnable {
}
}
}
}

Loading…
Cancel
Save