From 7ff9ed67e4f6678fe5eb9fa20c48710e790302b6 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 11 Dec 2019 11:35:14 +0300 Subject: [PATCH] Feature - schedule() methods with timeToLive parameter added to RScheduledExecutorService #2469 --- .../org/redisson/RedissonExecutorService.java | 54 ++++++++++-- .../org/redisson/api/RExecutorFuture.java | 2 +- .../org/redisson/api/RExecutorService.java | 4 +- .../api/RScheduledExecutorService.java | 85 +++++++++++++------ .../api/RScheduledExecutorServiceAsync.java | 83 ++++++++++++------ .../RedissonExecutorRemoteService.java | 12 ++- .../executor/ScheduledTasksService.java | 16 +++- .../redisson/executor/TasksRunnerService.java | 9 +- .../RedissonScheduledExecutorServiceTest.java | 18 +++- 9 files changed, 206 insertions(+), 77 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 08b45cfee..70478b976 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -140,6 +140,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { workersTopic = redisson.getTopic(workersChannelName); remoteService.setStatusName(statusName); + remoteService.setSchedulerQueueName(schedulerQueueName); remoteService.setTasksCounterName(tasksCounterName); remoteService.setTasksExpirationTimeName(tasksExpirationTimeName); remoteService.setTasksRetryIntervalName(tasksRetryIntervalName); @@ -166,6 +167,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); @@ -301,6 +303,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { service.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); service.setSchedulerChannelName(schedulerChannelName); service.setSchedulerQueueName(schedulerQueueName); + service.setTasksExpirationTimeName(tasksExpirationTimeName); service.setTasksRetryIntervalName(tasksRetryIntervalName); service.setBeanFactory(options.getBeanFactory()); @@ -870,35 +873,68 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RScheduledFuture scheduleAsync(Runnable task, long delay, TimeUnit unit) { + return scheduleAsync(task, delay, unit, 0, null); + } + + @Override + public RScheduledFuture schedule(Callable task, long delay, TimeUnit unit) { + RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit); + RemotePromise rp = (RemotePromise) future.getInnerPromise(); + syncExecute(rp); + return future; + } + + @Override + public RScheduledFuture scheduleAsync(Callable task, long delay, TimeUnit unit) { + return scheduleAsync(task, delay, unit, 0, null); + } + + @Override + public RScheduledFuture schedule(Runnable command, long delay, TimeUnit unit, long ttl, TimeUnit ttlUnit) { + RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(command, delay, unit, ttl, ttlUnit); + RemotePromise rp = (RemotePromise) future.getInnerPromise(); + syncExecute(rp); + return future; + } + + @Override + public RScheduledFuture scheduleAsync(Runnable task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) { check(task); ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(delay); - RemotePromise result = (RemotePromise) asyncScheduledService.scheduleRunnable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime)); + ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); + if (timeToLive > 0) { + params.setTtl(ttlUnit.toMillis(timeToLive)); + } + RemotePromise result = (RemotePromise) asyncScheduledService.scheduleRunnable(params); addListener(result); - return createFuture(result, startTime); } - + @Override - public RScheduledFuture schedule(Callable task, long delay, TimeUnit unit) { - RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit); + public RScheduledFuture schedule(Callable callable, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) { + RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(callable, delay, unit, timeToLive, ttlUnit); RemotePromise rp = (RemotePromise) future.getInnerPromise(); syncExecute(rp); return future; } - + @Override - public RScheduledFuture scheduleAsync(Callable task, long delay, TimeUnit unit) { + public RScheduledFuture scheduleAsync(Callable task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit) { check(task); ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(delay); - RemotePromise result = (RemotePromise) asyncScheduledService.scheduleCallable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime)); + ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); + if (timeToLive > 0) { + params.setTtl(ttlUnit.toMillis(timeToLive)); + } + RemotePromise result = (RemotePromise) asyncScheduledService.scheduleCallable(params); addListener(result); return createFuture(result, startTime); } - + @Override public RScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAtFixedRateAsync(task, initialDelay, period, unit); diff --git a/redisson/src/main/java/org/redisson/api/RExecutorFuture.java b/redisson/src/main/java/org/redisson/api/RExecutorFuture.java index 836468bfe..27dedfea4 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorFuture.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorFuture.java @@ -33,5 +33,5 @@ public interface RExecutorFuture extends RFuture { * @return task id */ String getTaskId(); - + } diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index a71c70cfd..b3e235757 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -73,7 +73,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync /** * Synchronously submits a Runnable task for execution asynchronously - * and returns a Future representing that task. The Future's {@code get} method will + * and returns a RExecutorFuture representing that task. The Future's {@code get} method will * return the given result upon successful completion. * * @param task the task to submit @@ -86,7 +86,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync /** * Synchronously submits a Runnable task for execution asynchronously. - * Returns a Future representing task completion. The Future's {@code get} method will + * Returns a RExecutorFuture representing task completion. The Future's {@code get} method will * return {@code null} upon successful completion. * * @param task the task to submit diff --git a/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java b/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java index 9e4c75ae6..06a536a84 100644 --- a/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RScheduledExecutorService.java @@ -28,8 +28,9 @@ import java.util.concurrent.TimeUnit; public interface RScheduledExecutorService extends RExecutorService, ScheduledExecutorService, RScheduledExecutorServiceAsync { /** - * Creates and executes a one-shot action that becomes enabled - * after the given delay. + * Synchronously schedules a Runnable task for execution asynchronously + * after the given delay. Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. * * @param command the task to execute * @param delay the time from now to delay execution @@ -43,8 +44,27 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx long delay, TimeUnit unit); /** - * Creates and executes a ScheduledFuture that becomes enabled after the - * given delay. + * Synchronously schedules a Runnable task with defined timeToLive parameter + * for execution asynchronously after the given delay. + * Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. + * + * @param command the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @param timeToLive - time to live interval + * @param ttlUnit - unit of time to live interval + * @return a ScheduledFuture representing pending completion of + * the task and whose {@code get()} method will return + * {@code null} upon completion + */ + RScheduledFuture schedule(Runnable command, + long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit); + + /** + * Synchronously schedules a value-returning task for execution asynchronously + * after the given delay. Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. * * @param callable the function to execute * @param delay the time from now to delay execution @@ -57,17 +77,29 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx long delay, TimeUnit unit); /** - * Creates and executes a periodic action that becomes enabled first - * after the given initial delay, and subsequently with the given - * period; that is executions will commence after - * {@code initialDelay} then {@code initialDelay+period}, then - * {@code initialDelay + 2 * period}, and so on. - * If any execution of the task - * encounters an exception, subsequent executions are suppressed. - * Otherwise, the task will only terminate via cancellation or - * termination of the executor. If any execution of this task - * takes longer than its period, then subsequent executions - * may start late, but will not concurrently execute. + * Synchronously schedules a value-returning task with defined timeToLive parameter + * for execution asynchronously after the given delay. + * Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. + * + * @param callable the function to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @param timeToLive - time to live interval + * @param ttlUnit - unit of time to live interval + * @param the type of the callable's result + * @return a ScheduledFuture that can be used to extract result or cancel + */ + RScheduledFuture schedule(Callable callable, + long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit); + + /** + * Synchronously schedules a Runnable task for execution asynchronously + * after the given initialDelay, and subsequently with the given + * period. + * Subsequent executions are stopped if any execution of the task throws an exception. + * Otherwise, task could be terminated via cancellation or + * termination of the executor. * * @param command the task to execute * @param initialDelay the time to delay first execution @@ -84,12 +116,11 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx TimeUnit unit); /** - * Creates and executes a periodic action that becomes enabled first - * after the given initial delay, and subsequently with the - * given delay between the termination of one execution and the - * commencement of the next. If any execution of the task - * encounters an exception, subsequent executions are suppressed. - * Otherwise, the task will only terminate via cancellation or + * Synchronously schedules a Runnable task for execution asynchronously + * after the given initialDelay, and subsequently with the given + * delay started from the task finishing moment. + * Subsequent executions are stopped if any execution of the task throws an exception. + * Otherwise, task could be terminated via cancellation or * termination of the executor. * * @param command the task to execute @@ -108,13 +139,11 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx TimeUnit unit); /** - * Creates and executes a periodic action with cron schedule object. - * If any execution of the task - * encounters an exception, subsequent executions are suppressed. - * Otherwise, the task will only terminate via cancellation or - * termination of the executor. If any execution of this task - * takes longer than its period, then subsequent executions - * may start late, but will not concurrently execute. + * Synchronously schedules a Runnable task for execution asynchronously + * cron schedule object. + * Subsequent executions are stopped if any execution of the task throws an exception. + * Otherwise, task could be terminated via cancellation or + * termination of the executor. * * @param task - command the task to execute * @param cronSchedule- cron schedule object diff --git a/redisson/src/main/java/org/redisson/api/RScheduledExecutorServiceAsync.java b/redisson/src/main/java/org/redisson/api/RScheduledExecutorServiceAsync.java index f6949bb46..57d8f1d77 100644 --- a/redisson/src/main/java/org/redisson/api/RScheduledExecutorServiceAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScheduledExecutorServiceAsync.java @@ -27,8 +27,9 @@ import java.util.concurrent.TimeUnit; public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync { /** - * Creates in async mode and executes a one-shot action that becomes enabled - * after the given delay. + * Schedules a Runnable task for execution asynchronously + * after the given delay. Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. * * @param task the task to execute * @param delay the time from now to delay execution @@ -36,10 +37,26 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync { * @return RScheduledFuture with listeners support */ RScheduledFuture scheduleAsync(Runnable task, long delay, TimeUnit unit); - + /** - * Creates in async mode and executes a ScheduledFuture that becomes enabled after the - * given delay. + * Schedules a Runnable task with defined timeToLive parameter + * for execution asynchronously after the given delay. + * Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. + * + * @param task the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @param timeToLive - time to live interval + * @param ttlUnit - unit of time to live interval + * @return RScheduledFuture with listeners support + */ + RScheduledFuture scheduleAsync(Runnable task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit); + + /** + * Schedules a value-returning task for execution asynchronously + * after the given delay. Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. * * @param task the function to execute * @param delay the time from now to delay execution @@ -48,17 +65,30 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync { * @return RScheduledFuture with listeners support */ RScheduledFuture scheduleAsync(Callable task, long delay, TimeUnit unit); - + /** - * Creates in async mode and executes a periodic action that becomes enabled first - * after the given initial delay, and subsequently with the given - * period. - * If any execution of the task - * encounters an exception, subsequent executions are suppressed. - * Otherwise, the task will only terminate via cancellation or - * termination of the executor. If any execution of this task - * takes longer than its period, then subsequent executions - * may start late, but will not concurrently execute. + * Schedules a value-returning task with defined timeToLive parameter + * for execution asynchronously after the given delay. + * Returns a RScheduledFuture representing that task. + * The Future's {@code get} method will return the given result upon successful completion. + * + * @param task the function to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @param timeToLive - time to live interval + * @param ttlUnit - unit of time to live interval + * @param the type of the callable's result + * @return RScheduledFuture with listeners support + */ + RScheduledFuture scheduleAsync(Callable task, long delay, TimeUnit unit, long timeToLive, TimeUnit ttlUnit); + + /** + * Schedules a Runnable task for execution asynchronously + * after the given initialDelay, and subsequently with the given + * period. + * Subsequent executions are stopped if any execution of the task throws an exception. + * Otherwise, task could be terminated via cancellation or + * termination of the executor. * * @param task the task to execute * @param initialDelay the time to delay first execution @@ -69,12 +99,11 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync { RScheduledFuture scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit); /** - * Creates in async mode and executes a periodic action that becomes enabled first - * after the given initial delay, and subsequently with the - * given delay between the termination of one execution and the - * commencement of the next. If any execution of the task - * encounters an exception, subsequent executions are suppressed. - * Otherwise, the task will only terminate via cancellation or + * Schedules a Runnable task for execution asynchronously + * after the given initialDelay, and subsequently with the given + * delay started from the task finishing moment. + * Subsequent executions are stopped if any execution of the task throws an exception. + * Otherwise, task could be terminated via cancellation or * termination of the executor. * * @param task the task to execute @@ -87,13 +116,11 @@ public interface RScheduledExecutorServiceAsync extends RExecutorServiceAsync { RScheduledFuture scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit); /** - * Creates in async mode and executes a periodic action with cron schedule object. - * If any execution of the task - * encounters an exception, subsequent executions are suppressed. - * Otherwise, the task will only terminate via cancellation or - * termination of the executor. If any execution of this task - * takes longer than its period, then subsequent executions - * may start late, but will not concurrently execute. + * Synchronously schedules a Runnable task for execution asynchronously + * cron schedule object. + * Subsequent executions are stopped if any execution of the task throws an exception. + * Otherwise, task could be terminated via cancellation or + * termination of the executor. * * @param task the task to execute * @param cronSchedule cron schedule object diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index 455f17c7c..c0833cc2a 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -40,6 +40,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { private String statusName; private String tasksRetryIntervalName; private String terminationTopicName; + private String schedulerQueueName; public RedissonExecutorRemoteService(Codec codec, String name, CommandAsyncService commandExecutor, String executorId, ConcurrentMap responses) { @@ -52,6 +53,10 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { "local value = redis.call('zscore', KEYS[2], ARGV[1]); " + "if (value ~= false and tonumber(value) < tonumber(ARGV[2])) then " + "redis.call('zrem', KEYS[2], ARGV[1]); " + + + "redis.call('zrem', KEYS[7], ARGV[1]); " + + "redis.call('zrem', KEYS[7], 'ff' .. ARGV[1]);" + + "redis.call('hdel', KEYS[1], ARGV[1]); " + "if redis.call('decr', KEYS[3]) == 0 then " + "redis.call('del', KEYS[3]);" @@ -65,10 +70,15 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { + "return nil;" + "end;" + "return redis.call('hget', KEYS[1], ARGV[1]); ", - Arrays.asList(tasks.getName(), tasksExpirationTimeName, tasksCounterName, statusName, tasksRetryIntervalName, terminationTopicName), + Arrays.asList(tasks.getName(), tasksExpirationTimeName, tasksCounterName, statusName, + tasksRetryIntervalName, terminationTopicName, schedulerQueueName), requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } + public void setSchedulerQueueName(String schedulerQueueName) { + this.schedulerQueueName = schedulerQueueName; + } + public void setTasksExpirationTimeName(String tasksExpirationTimeName) { this.tasksExpirationTimeName = tasksExpirationTimeName; } diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index d783a5480..814951235 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -52,7 +52,12 @@ public class ScheduledTasksService extends TasksService { protected RFuture addAsync(String requestQueueName, RemoteServiceRequest request) { ScheduledParameters params = (ScheduledParameters) request.getArgs()[0]; params.setRequestId(request.getId()); - + + long expireTime = 0; + if (params.getTtl() > 0) { + expireTime = System.currentTimeMillis() + params.getTtl(); + } + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " @@ -66,6 +71,10 @@ public class ScheduledTasksService extends TasksService { + "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);" + "end; " + + "if tonumber(ARGV[5]) > 0 then " + + "redis.call('zadd', KEYS[7], ARGV[5], ARGV[2]);" + + "end; " + + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);" + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + "redis.call('incr', KEYS[1]);" @@ -78,8 +87,9 @@ public class ScheduledTasksService extends TasksService { + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName), - params.getStartTime(), request.getId(), encode(request), tasksRetryInterval); + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, + schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName), + params.getStartTime(), request.getId(), encode(request), tasksRetryInterval, expireTime); } @Override diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index cf40a5d51..f8f949421 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -75,6 +75,8 @@ public class TasksRunnerService implements RemoteExecutorService { private String schedulerQueueName; private String schedulerChannelName; private String tasksRetryIntervalName; + private String tasksExpirationTimeName; + private BeanFactory beanFactory; private ConcurrentMap responses; @@ -90,7 +92,11 @@ public class TasksRunnerService implements RemoteExecutorService { public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } - + + public void setTasksExpirationTimeName(String tasksExpirationTimeName) { + this.tasksExpirationTimeName = tasksExpirationTimeName; + } + public void setTasksRetryIntervalName(String tasksRetryInterval) { this.tasksRetryIntervalName = tasksRetryInterval; } @@ -170,6 +176,7 @@ public class TasksRunnerService implements RemoteExecutorService { scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setRequestId(new RequestId(requestId)); + scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); return asyncScheduledServiceAtFixed; diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index db8e955ab..06d237208 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -7,10 +7,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.joor.Reflect; @@ -69,6 +66,19 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { } } + + @Test + public void testTTL() throws InterruptedException { + RScheduledExecutorService executor = redisson.getExecutorService("test"); + executor.submit(new DelayedTask(3000, "test")); + Future future = executor.schedule(new ScheduledRunnableTask("testparam"), 1, TimeUnit.SECONDS,2, TimeUnit.SECONDS); + Thread.sleep(500); + assertThat(executor.getTaskCount()).isEqualTo(2); + Thread.sleep(3000); + assertThat(executor.getTaskCount()).isEqualTo(0); + assertThat(redisson.getKeys().countExists("testparam")).isEqualTo(0); + } + @Test public void testSingleWorker() throws InterruptedException {