Feature - schedule() methods with timeToLive parameter added to RScheduledExecutorService #2469

pull/2485/head
Nikita Koksharov 5 years ago
parent a2550ae331
commit 7ff9ed67e4

@ -140,6 +140,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersTopic = redisson.getTopic(workersChannelName); workersTopic = redisson.getTopic(workersChannelName);
remoteService.setStatusName(statusName); remoteService.setStatusName(statusName);
remoteService.setSchedulerQueueName(schedulerQueueName);
remoteService.setTasksCounterName(tasksCounterName); remoteService.setTasksCounterName(tasksCounterName);
remoteService.setTasksExpirationTimeName(tasksExpirationTimeName); remoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
remoteService.setTasksRetryIntervalName(tasksRetryIntervalName); remoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
@ -166,6 +167,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
@ -301,6 +303,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
service.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); service.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
service.setSchedulerChannelName(schedulerChannelName); service.setSchedulerChannelName(schedulerChannelName);
service.setSchedulerQueueName(schedulerQueueName); service.setSchedulerQueueName(schedulerQueueName);
service.setTasksExpirationTimeName(tasksExpirationTimeName);
service.setTasksRetryIntervalName(tasksRetryIntervalName); service.setTasksRetryIntervalName(tasksRetryIntervalName);
service.setBeanFactory(options.getBeanFactory()); service.setBeanFactory(options.getBeanFactory());
@ -870,31 +873,64 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) { public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
return scheduleAsync(task, delay, unit, 0, null);
}
@Override
public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
syncExecute(rp);
return future;
}
@Override
public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) {
return scheduleAsync(task, delay, unit, 0, null);
}
@Override
public RScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit, long ttl, TimeUnit ttlUnit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(command, delay, unit, ttl, ttlUnit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
syncExecute(rp);
return future;
}
@Override
public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) {
check(task); check(task);
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime)); ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime);
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(params);
addListener(result); addListener(result);
return createFuture(result, startTime); return createFuture(result, startTime);
} }
@Override @Override
public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) { public <V> RScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit); RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(callable, delay, unit, timeToLive, ttlUnit);
RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise(); RemotePromise<?> rp = (RemotePromise<?>) future.getInnerPromise();
syncExecute(rp); syncExecute(rp);
return future; return future;
} }
@Override @Override
public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) { public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) {
check(task); check(task);
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime)); ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime);
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(params);
addListener(result); addListener(result);
return createFuture(result, startTime); return createFuture(result, startTime);
} }

@ -73,7 +73,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
/** /**
* Synchronously submits a Runnable task for execution asynchronously * Synchronously submits a Runnable task for execution asynchronously
* and returns a Future representing that task. The Future's {@code get} method will * and returns a RExecutorFuture representing that task. The Future's {@code get} method will
* return the given result upon successful completion. * return the given result upon successful completion.
* *
* @param task the task to submit * @param task the task to submit
@ -86,7 +86,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
/** /**
* Synchronously submits a Runnable task for execution asynchronously. * Synchronously submits a Runnable task for execution asynchronously.
* Returns a Future representing task completion. The Future's {@code get} method will * Returns a RExecutorFuture representing task completion. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion. * return {@code null} upon <em>successful</em> completion.
* *
* @param task the task to submit * @param task the task to submit

@ -28,8 +28,9 @@ import java.util.concurrent.TimeUnit;
public interface RScheduledExecutorService extends RExecutorService, ScheduledExecutorService, RScheduledExecutorServiceAsync { public interface RScheduledExecutorService extends RExecutorService, ScheduledExecutorService, RScheduledExecutorServiceAsync {
/** /**
* Creates and executes a one-shot action that becomes enabled * Synchronously schedules a Runnable task for execution asynchronously
* after the given delay. * after the given <code>delay</code>. Returns a RScheduledFuture representing that task.
* The Future's {@code get} method will return the given result upon successful completion.
* *
* @param command the task to execute * @param command the task to execute
* @param delay the time from now to delay execution * @param delay the time from now to delay execution
@ -43,8 +44,27 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx
long delay, TimeUnit unit); long delay, TimeUnit unit);
/** /**
* Creates and executes a ScheduledFuture that becomes enabled after the * Synchronously schedules a Runnable task with defined <code>timeToLive</code> parameter
* given delay. * for execution asynchronously after the given <code>delay</code>.
* Returns a RScheduledFuture representing that task.
* The Future's {@code get} method will return the given result upon successful completion.
*
* @param command the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @param timeToLive - time to live interval
* @param ttlUnit - unit of time to live interval
* @return a ScheduledFuture representing pending completion of
* the task and whose {@code get()} method will return
* {@code null} upon completion
*/
RScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit);
/**
* Synchronously schedules a value-returning task for execution asynchronously
* after the given <code>delay</code>. Returns a RScheduledFuture representing that task.
* The Future's {@code get} method will return the given result upon successful completion.
* *
* @param callable the function to execute * @param callable the function to execute
* @param delay the time from now to delay execution * @param delay the time from now to delay execution
@ -57,17 +77,29 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx
long delay, TimeUnit unit); long delay, TimeUnit unit);
/** /**
* Creates and executes a periodic action that becomes enabled first * Synchronously schedules a value-returning task with defined <code>timeToLive</code> parameter
* after the given initial delay, and subsequently with the given * for execution asynchronously after the given <code>delay</code>.
* period; that is executions will commence after * Returns a RScheduledFuture representing that task.
* {@code initialDelay} then {@code initialDelay+period}, then * The Future's {@code get} method will return the given result upon successful completion.
* {@code initialDelay + 2 * period}, and so on. *
* If any execution of the task * @param callable the function to execute
* encounters an exception, subsequent executions are suppressed. * @param delay the time from now to delay execution
* Otherwise, the task will only terminate via cancellation or * @param unit the time unit of the delay parameter
* termination of the executor. If any execution of this task * @param timeToLive - time to live interval
* takes longer than its period, then subsequent executions * @param ttlUnit - unit of time to live interval
* may start late, but will not concurrently execute. * @param <V> the type of the callable's result
* @return a ScheduledFuture that can be used to extract result or cancel
*/
<V> RScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit);
/**
* Synchronously schedules a Runnable task for execution asynchronously
* after the given <code>initialDelay</code>, and subsequently with the given
* <code>period</code>.
* Subsequent executions are stopped if any execution of the task throws an exception.
* Otherwise, task could be terminated via cancellation or
* termination of the executor.
* *
* @param command the task to execute * @param command the task to execute
* @param initialDelay the time to delay first execution * @param initialDelay the time to delay first execution
@ -84,12 +116,11 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx
TimeUnit unit); TimeUnit unit);
/** /**
* Creates and executes a periodic action that becomes enabled first * Synchronously schedules a Runnable task for execution asynchronously
* after the given initial delay, and subsequently with the * after the given <code>initialDelay</code>, and subsequently with the given
* given delay between the termination of one execution and the * <code>delay</code> started from the task finishing moment.
* commencement of the next. If any execution of the task * Subsequent executions are stopped if any execution of the task throws an exception.
* encounters an exception, subsequent executions are suppressed. * Otherwise, task could be terminated via cancellation or
* Otherwise, the task will only terminate via cancellation or
* termination of the executor. * termination of the executor.
* *
* @param command the task to execute * @param command the task to execute
@ -108,13 +139,11 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx
TimeUnit unit); TimeUnit unit);
/** /**
* Creates and executes a periodic action with cron schedule object. * Synchronously schedules a Runnable task for execution asynchronously
* If any execution of the task * cron schedule object.
* encounters an exception, subsequent executions are suppressed. * Subsequent executions are stopped if any execution of the task throws an exception.
* Otherwise, the task will only terminate via cancellation or * Otherwise, task could be terminated via cancellation or
* termination of the executor. If any execution of this task * termination of the executor.
* takes longer than its period, then subsequent executions
* may start late, but will not concurrently execute.
* *
* @param task - command the task to execute * @param task - command the task to execute
* @param cronSchedule- cron schedule object * @param cronSchedule- cron schedule object

@ -27,8 +27,9 @@ import java.util.concurrent.TimeUnit;
public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync { public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync {
/** /**
* Creates in async mode and executes a one-shot action that becomes enabled * Schedules a Runnable task for execution asynchronously
* after the given delay. * after the given <code>delay</code>. Returns a RScheduledFuture representing that task.
* The Future's {@code get} method will return the given result upon successful completion.
* *
* @param task the task to execute * @param task the task to execute
* @param delay the time from now to delay execution * @param delay the time from now to delay execution
@ -38,8 +39,24 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync {
RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit); RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit);
/** /**
* Creates in async mode and executes a ScheduledFuture that becomes enabled after the * Schedules a Runnable task with defined <code>timeToLive</code> parameter
* given delay. * for execution asynchronously after the given <code>delay</code>.
* Returns a RScheduledFuture representing that task.
* The Future's {@code get} method will return the given result upon successful completion.
*
* @param task the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @param timeToLive - time to live interval
* @param ttlUnit - unit of time to live interval
* @return RScheduledFuture with listeners support
*/
RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit);
/**
* Schedules a value-returning task for execution asynchronously
* after the given <code>delay</code>. Returns a RScheduledFuture representing that task.
* The Future's {@code get} method will return the given result upon successful completion.
* *
* @param task the function to execute * @param task the function to execute
* @param delay the time from now to delay execution * @param delay the time from now to delay execution
@ -50,15 +67,28 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync {
<V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit); <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit);
/** /**
* Creates in async mode and executes a periodic action that becomes enabled first * Schedules a value-returning task with defined <code>timeToLive</code> parameter
* after the given initial delay, and subsequently with the given * for execution asynchronously after the given <code>delay</code>.
* period. * Returns a RScheduledFuture representing that task.
* If any execution of the task * The Future's {@code get} method will return the given result upon successful completion.
* encounters an exception, subsequent executions are suppressed. *
* Otherwise, the task will only terminate via cancellation or * @param task the function to execute
* termination of the executor. If any execution of this task * @param delay the time from now to delay execution
* takes longer than its period, then subsequent executions * @param unit the time unit of the delay parameter
* may start late, but will not concurrently execute. * @param timeToLive - time to live interval
* @param ttlUnit - unit of time to live interval
* @param <V> the type of the callable's result
* @return RScheduledFuture with listeners support
*/
<V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit);
/**
* Schedules a Runnable task for execution asynchronously
* after the given <code>initialDelay</code>, and subsequently with the given
* <code>period</code>.
* Subsequent executions are stopped if any execution of the task throws an exception.
* Otherwise, task could be terminated via cancellation or
* termination of the executor.
* *
* @param task the task to execute * @param task the task to execute
* @param initialDelay the time to delay first execution * @param initialDelay the time to delay first execution
@ -69,12 +99,11 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync {
RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit); RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit);
/** /**
* Creates in async mode and executes a periodic action that becomes enabled first * Schedules a Runnable task for execution asynchronously
* after the given initial delay, and subsequently with the * after the given <code>initialDelay</code>, and subsequently with the given
* given delay between the termination of one execution and the * <code>delay</code> started from the task finishing moment.
* commencement of the next. If any execution of the task * Subsequent executions are stopped if any execution of the task throws an exception.
* encounters an exception, subsequent executions are suppressed. * Otherwise, task could be terminated via cancellation or
* Otherwise, the task will only terminate via cancellation or
* termination of the executor. * termination of the executor.
* *
* @param task the task to execute * @param task the task to execute
@ -87,13 +116,11 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync {
RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit); RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit);
/** /**
* Creates in async mode and executes a periodic action with cron schedule object. * Synchronously schedules a Runnable task for execution asynchronously
* If any execution of the task * cron schedule object.
* encounters an exception, subsequent executions are suppressed. * Subsequent executions are stopped if any execution of the task throws an exception.
* Otherwise, the task will only terminate via cancellation or * Otherwise, task could be terminated via cancellation or
* termination of the executor. If any execution of this task * termination of the executor.
* takes longer than its period, then subsequent executions
* may start late, but will not concurrently execute.
* *
* @param task the task to execute * @param task the task to execute
* @param cronSchedule cron schedule object * @param cronSchedule cron schedule object

@ -40,6 +40,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
private String statusName; private String statusName;
private String tasksRetryIntervalName; private String tasksRetryIntervalName;
private String terminationTopicName; private String terminationTopicName;
private String schedulerQueueName;
public RedissonExecutorRemoteService(Codec codec, String name, public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) { CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
@ -52,6 +53,10 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
"local value = redis.call('zscore', KEYS[2], ARGV[1]); " + "local value = redis.call('zscore', KEYS[2], ARGV[1]); " +
"if (value ~= false and tonumber(value) < tonumber(ARGV[2])) then " "if (value ~= false and tonumber(value) < tonumber(ARGV[2])) then "
+ "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('zrem', KEYS[2], ARGV[1]); "
+ "redis.call('zrem', KEYS[7], ARGV[1]); "
+ "redis.call('zrem', KEYS[7], 'ff' .. ARGV[1]);"
+ "redis.call('hdel', KEYS[1], ARGV[1]); " + "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "if redis.call('decr', KEYS[3]) == 0 then " + "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);" + "redis.call('del', KEYS[3]);"
@ -65,10 +70,15 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
+ "return nil;" + "return nil;"
+ "end;" + "end;"
+ "return redis.call('hget', KEYS[1], ARGV[1]); ", + "return redis.call('hget', KEYS[1], ARGV[1]); ",
Arrays.asList(tasks.getName(), tasksExpirationTimeName, tasksCounterName, statusName, tasksRetryIntervalName, terminationTopicName), Arrays.asList(tasks.getName(), tasksExpirationTimeName, tasksCounterName, statusName,
tasksRetryIntervalName, terminationTopicName, schedulerQueueName),
requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
} }
public void setSchedulerQueueName(String schedulerQueueName) {
this.schedulerQueueName = schedulerQueueName;
}
public void setTasksExpirationTimeName(String tasksExpirationTimeName) { public void setTasksExpirationTimeName(String tasksExpirationTimeName) {
this.tasksExpirationTimeName = tasksExpirationTimeName; this.tasksExpirationTimeName = tasksExpirationTimeName;
} }

@ -53,6 +53,11 @@ public class ScheduledTasksService extends TasksService {
ScheduledParameters params = (ScheduledParameters) request.getArgs()[0]; ScheduledParameters params = (ScheduledParameters) request.getArgs()[0];
params.setRequestId(request.getId()); params.setRequestId(request.getId());
long expireTime = 0;
if (params.getTtl() > 0) {
expireTime = System.currentTimeMillis() + params.getTtl();
}
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state // check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then " "if redis.call('exists', KEYS[2]) == 0 then "
@ -66,6 +71,10 @@ public class ScheduledTasksService extends TasksService {
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);" + "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "end; " + "end; "
+ "if tonumber(ARGV[5]) > 0 then "
+ "redis.call('zadd', KEYS[7], ARGV[5], ARGV[2]);"
+ "end; "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);" + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);"
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('incr', KEYS[1]);" + "redis.call('incr', KEYS[1]);"
@ -78,8 +87,9 @@ public class ScheduledTasksService extends TasksService {
+ "return 1;" + "return 1;"
+ "end;" + "end;"
+ "return 0;", + "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName), Arrays.asList(tasksCounterName, statusName, schedulerQueueName,
params.getStartTime(), request.getId(), encode(request), tasksRetryInterval); schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName),
params.getStartTime(), request.getId(), encode(request), tasksRetryInterval, expireTime);
} }
@Override @Override

@ -75,6 +75,8 @@ public class TasksRunnerService implements RemoteExecutorService {
private String schedulerQueueName; private String schedulerQueueName;
private String schedulerChannelName; private String schedulerChannelName;
private String tasksRetryIntervalName; private String tasksRetryIntervalName;
private String tasksExpirationTimeName;
private BeanFactory beanFactory; private BeanFactory beanFactory;
private ConcurrentMap<String, ResponseEntry> responses; private ConcurrentMap<String, ResponseEntry> responses;
@ -91,6 +93,10 @@ public class TasksRunnerService implements RemoteExecutorService {
this.beanFactory = beanFactory; this.beanFactory = beanFactory;
} }
public void setTasksExpirationTimeName(String tasksExpirationTimeName) {
this.tasksExpirationTimeName = tasksExpirationTimeName;
}
public void setTasksRetryIntervalName(String tasksRetryInterval) { public void setTasksRetryIntervalName(String tasksRetryInterval) {
this.tasksRetryIntervalName = tasksRetryInterval; this.tasksRetryIntervalName = tasksRetryInterval;
} }
@ -170,6 +176,7 @@ public class TasksRunnerService implements RemoteExecutorService {
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setRequestId(new RequestId(requestId)); scheduledRemoteService.setRequestId(new RequestId(requestId));
scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
return asyncScheduledServiceAtFixed; return asyncScheduledServiceAtFixed;

@ -7,10 +7,7 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CancellationException; import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.joor.Reflect; import org.joor.Reflect;
@ -70,6 +67,19 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
} }
@Test
public void testTTL() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.submit(new DelayedTask(3000, "test"));
Future<?> future = executor.schedule(new ScheduledRunnableTask("testparam"), 1, TimeUnit.SECONDS,2, TimeUnit.SECONDS);
Thread.sleep(500);
assertThat(executor.getTaskCount()).isEqualTo(2);
Thread.sleep(3000);
assertThat(executor.getTaskCount()).isEqualTo(0);
assertThat(redisson.getKeys().countExists("testparam")).isEqualTo(0);
}
@Test @Test
public void testSingleWorker() throws InterruptedException { public void testSingleWorker() throws InterruptedException {
Config config = createConfig(); Config config = createConfig();

Loading…
Cancel
Save