diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 29ab24128..08b45cfee 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -69,6 +69,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final String schedulerQueueName; private final String schedulerChannelName; private final String tasksRetryIntervalName; + private final String tasksExpirationTimeName; private final String workersChannelName; private final String workersSemaphoreName; @@ -77,7 +78,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final String tasksCounterName; private final String statusName; private final RTopic terminationTopic; - private final RRemoteService remoteService; + private final RedissonExecutorRemoteService remoteService; private final RTopic workersTopic; private int workersGroupListenerId; @@ -119,8 +120,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { } remoteService = new RedissonExecutorRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses); - requestQueueName = ((RedissonRemoteService) remoteService).getRequestQueueName(RemoteExecutorService.class); - responseQueueName = ((RedissonRemoteService) remoteService).getResponseQueueName(executorId); + requestQueueName = remoteService.getRequestQueueName(RemoteExecutorService.class); + responseQueueName = remoteService.getResponseQueueName(executorId); String objectName = requestQueueName; tasksCounterName = objectName + ":counter"; tasksName = objectName + ":tasks"; @@ -128,6 +129,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { terminationTopic = redisson.getTopic(objectName + ":termination-topic", LongCodec.INSTANCE); tasksRetryIntervalName = objectName + ":retry-interval"; + tasksExpirationTimeName = objectName + ":expiration"; schedulerChannelName = objectName + ":scheduler-channel"; schedulerQueueName = objectName + ":scheduler"; @@ -136,7 +138,13 @@ public class RedissonExecutorService implements RScheduledExecutorService { workersCounterName = objectName + ":workers-counter"; workersTopic = redisson.getTopic(workersChannelName); - + + remoteService.setStatusName(statusName); + remoteService.setTasksCounterName(tasksCounterName); + remoteService.setTasksExpirationTimeName(tasksExpirationTimeName); + remoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + remoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); + executorRemoteService = new TasksService(codec, name, commandExecutor, executorId, responses); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); @@ -145,6 +153,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { executorRemoteService.setSchedulerChannelName(schedulerChannelName); executorRemoteService.setSchedulerQueueName(schedulerQueueName); executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); + executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); @@ -344,6 +353,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private TasksBatchService createBatchService() { TasksBatchService executorRemoteService = new TasksBatchService(codec, name, commandExecutor, executorId, responses); + executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); @@ -556,7 +566,24 @@ public class RedissonExecutorService implements RScheduledExecutorService { syncExecute(promise); return createFuture(promise); } - + + @Override + public RExecutorFuture submit(Callable task, long timeToLive, TimeUnit timeUnit) { + RemotePromise promise = (RemotePromise) ((PromiseDelegator) submitAsync(task, timeToLive, timeUnit)).getInnerPromise(); + syncExecute(promise); + return createFuture(promise); + } + + @Override + public RExecutorFuture submitAsync(Callable task, long timeToLive, TimeUnit timeUnit) { + check(task); + TaskParameters taskParameters = createTaskParameters(task); + taskParameters.setTtl(timeUnit.toMillis(timeToLive)); + RemotePromise result = (RemotePromise) asyncService.executeCallable(taskParameters); + addListener(result); + return createFuture(result); + } + @Override public RExecutorFuture submitAsync(Callable task) { check(task); @@ -758,7 +785,24 @@ public class RedissonExecutorService implements RScheduledExecutorService { syncExecute(promise); return createFuture(promise); } - + + @Override + public RExecutorFuture submit(Runnable task, long timeToLive, TimeUnit timeUnit) { + RemotePromise promise = (RemotePromise) ((PromiseDelegator) submitAsync(task, timeToLive, timeUnit)).getInnerPromise(); + syncExecute(promise); + return createFuture(promise); + } + + @Override + public RExecutorFuture submitAsync(Runnable task, long timeToLive, TimeUnit timeUnit) { + check(task); + TaskParameters taskParameters = createTaskParameters(task); + taskParameters.setTtl(timeUnit.toMillis(timeToLive)); + RemotePromise result = (RemotePromise) asyncService.executeRunnable(taskParameters); + addListener(result); + return createFuture(result); + } + @Override public RExecutorFuture submitAsync(Runnable task) { check(task); diff --git a/redisson/src/main/java/org/redisson/api/RExecutorService.java b/redisson/src/main/java/org/redisson/api/RExecutorService.java index ace35e5f4..a71c70cfd 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorService.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorService.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * Distributed implementation of {@link java.util.concurrent.ExecutorService} @@ -33,7 +34,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync String MAPREDUCE_NAME = "redisson_mapreduce"; /** - * Submits a value-returning task for execution synchronously and returns a + * Synchronously submits a value-returning task for execution asynchronously and returns a * Future representing the pending results of the task. The * Future's {@code get} method will return the task's result upon * successful completion. @@ -44,9 +45,24 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync */ @Override RExecutorFuture submit(Callable task); - + /** - * Submits tasks batch for execution synchronously. + * Synchronously submits a value-returning task with defined timeToLive parameter + * for execution asynchronously. Returns a Future representing the pending + * results of the task. The Future's {@code get} method will return the + * task's result upon successful completion. + * + * @param task the task to submit + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @param the type of the task's result + * @return a Future representing pending completion of the task + */ + RExecutorFuture submit(Callable task, long timeToLive, TimeUnit timeUnit); + + + /** + * Synchronously submits tasks batch for execution asynchronously. * All tasks are stored to executor request queue atomically, * if case of any error none of tasks will be added. * @@ -56,8 +72,8 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync RExecutorBatchFuture submit(Callable...tasks); /** - * Submits a Runnable task for execution and returns a Future - * representing that task. The Future's {@code get} method will + * Synchronously submits a Runnable task for execution asynchronously + * and returns a Future representing that task. The Future's {@code get} method will * return the given result upon successful completion. * * @param task the task to submit @@ -69,8 +85,8 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync RExecutorFuture submit(Runnable task, T result);; /** - * Submits a Runnable task for execution and returns a Future - * representing that task. The Future's {@code get} method will + * Synchronously submits a Runnable task for execution asynchronously. + * Returns a Future representing task completion. The Future's {@code get} method will * return {@code null} upon successful completion. * * @param task the task to submit @@ -80,7 +96,20 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync RExecutorFuture submit(Runnable task); /** - * Submits tasks batch for execution synchronously. + * Synchronously submits a task with defined timeToLive parameter + * for execution asynchronously. Returns a Future representing task completion. + * The Future's {@code get} method will return the + * task's result upon successful completion. + * + * @param task the task to submit + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @return a Future representing pending completion of the task + */ + RExecutorFuture submit(Runnable task, long timeToLive, TimeUnit timeUnit); + + /** + * Synchronously submits tasks batch for execution asynchronously. * All tasks are stored to executor request queue atomically, * if case of any error none of tasks will be added. * diff --git a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java index 08bd5ead1..c2fb09661 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; /** * Distributed async implementation of {@link java.util.concurrent.ExecutorService} @@ -76,7 +77,22 @@ public interface RExecutorServiceAsync { RExecutorFuture submitAsync(Callable task); /** - * Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically, + * Submits a value-returning task with defined timeToLive parameter + * for execution asynchronously. Returns a Future representing the pending + * results of the task. The Future's {@code get} method will return the + * task's result upon successful completion. + * + * @param task the task to submit + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @param the type of the task's result + * @return a Future representing pending completion of the task + */ + RExecutorFuture submitAsync(Callable task, long timeToLive, TimeUnit timeUnit); + + /** + * Submits tasks batch for execution asynchronously. + * All tasks are stored to executor request queue atomically, * if case of any error none of tasks will be added. * * @param tasks - tasks to execute @@ -92,6 +108,19 @@ public interface RExecutorServiceAsync { */ RExecutorFuture submitAsync(Runnable task); + /** + * Submits a task with defined timeToLive parameter + * for execution asynchronously. Returns a Future representing task completion. + * The Future's {@code get} method will return the + * task's result upon successful completion. + * + * @param task the task to submit + * @param timeToLive - time to live interval + * @param timeUnit - unit of time to live interval + * @return a Future representing pending completion of the task + */ + RExecutorFuture submitAsync(Runnable task, long timeToLive, TimeUnit timeUnit); + /** * Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically, * if case of any error none of tasks will be added. diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index 43b469f35..455f17c7c 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -15,16 +15,19 @@ */ package org.redisson.executor; -import java.util.concurrent.ConcurrentMap; - +import org.redisson.RedissonExecutorService; import org.redisson.RedissonRemoteService; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncService; import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.ResponseEntry; +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; + /** * * @author Nikita Koksharov @@ -32,6 +35,12 @@ import org.redisson.remote.ResponseEntry; */ public class RedissonExecutorRemoteService extends RedissonRemoteService { + private String tasksExpirationTimeName; + private String tasksCounterName; + private String statusName; + private String tasksRetryIntervalName; + private String terminationTopicName; + public RedissonExecutorRemoteService(Codec codec, String name, CommandAsyncService commandExecutor, String executorId, ConcurrentMap responses) { super(codec, name, commandExecutor, executorId, responses); @@ -39,7 +48,44 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { @Override protected RFuture getTask(String requestId, RMap tasks) { - return tasks.getAsync(requestId); + return commandExecutor.evalWriteAsync(tasks.getName(), codec, RedisCommands.EVAL_OBJECT, + "local value = redis.call('zscore', KEYS[2], ARGV[1]); " + + "if (value ~= false and tonumber(value) < tonumber(ARGV[2])) then " + + "redis.call('zrem', KEYS[2], ARGV[1]); " + + "redis.call('hdel', KEYS[1], ARGV[1]); " + + "if redis.call('decr', KEYS[3]) == 0 then " + + "redis.call('del', KEYS[3]);" + + "if redis.call('get', KEYS[4]) == ARGV[3] then " + + "redis.call('del', KEYS[5]);" + + "redis.call('set', KEYS[4], ARGV[4]);" + + "redis.call('publish', KEYS[6], ARGV[4]);" + + "end;" + + "end;" + + + "return nil;" + + "end;" + + "return redis.call('hget', KEYS[1], ARGV[1]); ", + Arrays.asList(tasks.getName(), tasksExpirationTimeName, tasksCounterName, statusName, tasksRetryIntervalName, terminationTopicName), + requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + } + + public void setTasksExpirationTimeName(String tasksExpirationTimeName) { + this.tasksExpirationTimeName = tasksExpirationTimeName; + } + + public void setTasksCounterName(String tasksCounterName) { + this.tasksCounterName = tasksCounterName; + } + + public void setStatusName(String statusName) { + this.statusName = statusName; + } + + public void setTasksRetryIntervalName(String tasksRetryIntervalName) { + this.tasksRetryIntervalName = tasksRetryIntervalName; + } + + public void setTerminationTopicName(String terminationTopicName) { + this.terminationTopicName = terminationTopicName; } - } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 994d4cbcd..cf40a5d51 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -15,17 +15,10 @@ */ package org.redisson.executor; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.util.Arrays; -import java.util.Date; -import java.util.Map; -import java.util.TimeZone; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import org.redisson.Redisson; import org.redisson.RedissonExecutorService; import org.redisson.RedissonShutdownException; @@ -40,11 +33,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.CustomObjectInputStream; import org.redisson.command.CommandExecutor; -import org.redisson.executor.params.ScheduledAtFixedRateParameters; -import org.redisson.executor.params.ScheduledCronExpressionParameters; -import org.redisson.executor.params.ScheduledParameters; -import org.redisson.executor.params.ScheduledWithFixedDelayParameters; -import org.redisson.executor.params.TaskParameters; +import org.redisson.executor.params.*; import org.redisson.misc.Hash; import org.redisson.misc.HashValue; import org.redisson.misc.Injector; @@ -53,10 +42,15 @@ import org.redisson.remote.ResponseEntry; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; +import java.io.ByteArrayInputStream; +import java.io.ObjectInput; +import java.util.Arrays; +import java.util.Date; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; /** * Executor service runs Callable and Runnable tasks. diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 7aa6146a1..fb52f71da 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -47,12 +47,17 @@ public class TasksService extends BaseRemoteService { protected String schedulerQueueName; protected String schedulerChannelName; protected String tasksRetryIntervalName; + protected String tasksExpirationTimeName; protected long tasksRetryInterval; public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, name, commandExecutor, executorId, responses); } - + + public void setTasksExpirationTimeName(String tasksExpirationTimeName) { + this.tasksExpirationTimeName = tasksExpirationTimeName; + } + public void setTasksRetryIntervalName(String tasksRetryIntervalName) { this.tasksRetryIntervalName = tasksRetryIntervalName; } @@ -121,6 +126,10 @@ public class TasksService extends BaseRemoteService { if (tasksRetryInterval > 0) { retryStartTime = System.currentTimeMillis() + tasksRetryInterval; } + long expireTime = 0; + if (params.getTtl() > 0) { + expireTime = System.currentTimeMillis() + params.getTtl(); + } return getAddCommandExecutor().evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // check if executor service not in shutdown state @@ -128,7 +137,11 @@ public class TasksService extends BaseRemoteService { + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + "redis.call('rpush', KEYS[6], ARGV[2]); " + "redis.call('incr', KEYS[1]);" - + + + "if tonumber(ARGV[5]) > 0 then " + + "redis.call('zadd', KEYS[8], ARGV[5], ARGV[2]);" + + "end; " + + "if tonumber(ARGV[1]) > 0 then " + "redis.call('set', KEYS[7], ARGV[4]);" + "redis.call('zadd', KEYS[3], ARGV[1], 'ff' .. ARGV[2]);" @@ -137,19 +150,21 @@ public class TasksService extends BaseRemoteService { // to all scheduler workers + "if v[1] == ARGV[2] then " + "redis.call('publish', KEYS[4], ARGV[1]); " - + "end " + + "end; " + "end;" + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName), - retryStartTime, request.getId(), encode(request), tasksRetryInterval); + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, + tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName), + retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime); } @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); " + + "redis.call('zrem', KEYS[8], ARGV[1]); " + "local task = redis.call('hget', KEYS[6], ARGV[1]); " + "redis.call('hdel', KEYS[6], ARGV[1]); " // remove from executor queue @@ -168,7 +183,8 @@ public class TasksService extends BaseRemoteService { + "return 1; " + "end;" + "return 0;", - Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName), + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, + tasksName, tasksRetryIntervalName, tasksExpirationTimeName), taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } diff --git a/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java b/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java index 9b535da25..665538457 100644 --- a/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java @@ -31,6 +31,7 @@ public class TaskParameters implements Serializable { private byte[] lambdaBody; private byte[] state; private String requestId; + private long ttl; public TaskParameters() { } @@ -42,7 +43,14 @@ public class TaskParameters implements Serializable { this.state = state; this.lambdaBody = lambdaBody; } - + + public long getTtl() { + return ttl; + } + public void setTtl(long ttl) { + this.ttl = ttl; + } + public byte[] getLambdaBody() { return lambdaBody; } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 88130af76..0adc33cb4 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -25,11 +25,7 @@ import org.redisson.BaseTest; import org.redisson.RedisRunner; import org.redisson.Redisson; import org.redisson.RedissonNode; -import org.redisson.api.ExecutorOptions; -import org.redisson.api.RExecutorBatchFuture; -import org.redisson.api.RExecutorFuture; -import org.redisson.api.RExecutorService; -import org.redisson.api.RedissonClient; +import org.redisson.api.*; import org.redisson.api.annotation.RInject; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; @@ -524,7 +520,19 @@ public class RedissonExecutorServiceTest extends BaseTest { Future future = redisson.getExecutorService("test").submit(new ParameterizedTask("testparam")); assertThat(future.get()).isEqualTo("testparam"); } - + + @Test + public void testTTL() throws InterruptedException { + RScheduledExecutorService executor = redisson.getExecutorService("test"); + executor.submit(new DelayedTask(2000, "test")); + Future future = executor.submit(new ScheduledRunnableTask("testparam"), 1, TimeUnit.SECONDS); + Thread.sleep(500); + assertThat(executor.getTaskCount()).isEqualTo(2); + Thread.sleep(2000); + assertThat(executor.getTaskCount()).isEqualTo(0); + assertThat(redisson.getKeys().countExists("testparam")).isEqualTo(0); + } + @Test(expected = IllegalArgumentException.class) public void testAnonymousRunnable() { redisson.getExecutorService("test").submit(new Runnable() {