Feature - submit() methods with ttl parameter added to RExecutorService. #2445

pull/2459/head
Nikita Koksharov 5 years ago
parent f8b4b6b4ec
commit 33bfb37772

@ -69,6 +69,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String schedulerQueueName;
private final String schedulerChannelName;
private final String tasksRetryIntervalName;
private final String tasksExpirationTimeName;
private final String workersChannelName;
private final String workersSemaphoreName;
@ -77,7 +78,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String tasksCounterName;
private final String statusName;
private final RTopic terminationTopic;
private final RRemoteService remoteService;
private final RedissonExecutorRemoteService remoteService;
private final RTopic workersTopic;
private int workersGroupListenerId;
@ -119,8 +120,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
remoteService = new RedissonExecutorRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
requestQueueName = ((RedissonRemoteService) remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService) remoteService).getResponseQueueName(executorId);
requestQueueName = remoteService.getRequestQueueName(RemoteExecutorService.class);
responseQueueName = remoteService.getResponseQueueName(executorId);
String objectName = requestQueueName;
tasksCounterName = objectName + ":counter";
tasksName = objectName + ":tasks";
@ -128,6 +129,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
terminationTopic = redisson.getTopic(objectName + ":termination-topic", LongCodec.INSTANCE);
tasksRetryIntervalName = objectName + ":retry-interval";
tasksExpirationTimeName = objectName + ":expiration";
schedulerChannelName = objectName + ":scheduler-channel";
schedulerQueueName = objectName + ":scheduler";
@ -136,7 +138,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersCounterName = objectName + ":workers-counter";
workersTopic = redisson.getTopic(workersChannelName);
remoteService.setStatusName(statusName);
remoteService.setTasksCounterName(tasksCounterName);
remoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
remoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
remoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService = new TasksService(codec, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
@ -145,6 +153,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
executorRemoteService.setSchedulerChannelName(schedulerChannelName);
executorRemoteService.setSchedulerQueueName(schedulerQueueName);
executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
@ -344,6 +353,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private TasksBatchService createBatchService() {
TasksBatchService executorRemoteService = new TasksBatchService(codec, name, commandExecutor, executorId, responses);
executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
@ -556,7 +566,24 @@ public class RedissonExecutorService implements RScheduledExecutorService {
syncExecute(promise);
return createFuture(promise);
}
@Override
public <T> RExecutorFuture<T> submit(Callable<T> task, long timeToLive, TimeUnit timeUnit) {
RemotePromise<T> promise = (RemotePromise<T>) ((PromiseDelegator<T>) submitAsync(task, timeToLive, timeUnit)).getInnerPromise();
syncExecute(promise);
return createFuture(promise);
}
@Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task, long timeToLive, TimeUnit timeUnit) {
check(task);
TaskParameters taskParameters = createTaskParameters(task);
taskParameters.setTtl(timeUnit.toMillis(timeToLive));
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(taskParameters);
addListener(result);
return createFuture(result);
}
@Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task);
@ -758,7 +785,24 @@ public class RedissonExecutorService implements RScheduledExecutorService {
syncExecute(promise);
return createFuture(promise);
}
@Override
public RExecutorFuture<?> submit(Runnable task, long timeToLive, TimeUnit timeUnit) {
RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task, timeToLive, timeUnit)).getInnerPromise();
syncExecute(promise);
return createFuture(promise);
}
@Override
public RExecutorFuture<?> submitAsync(Runnable task, long timeToLive, TimeUnit timeUnit) {
check(task);
TaskParameters taskParameters = createTaskParameters(task);
taskParameters.setTtl(timeUnit.toMillis(timeToLive));
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(taskParameters);
addListener(result);
return createFuture(result);
}
@Override
public RExecutorFuture<?> submitAsync(Runnable task) {
check(task);

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Distributed implementation of {@link java.util.concurrent.ExecutorService}
@ -33,7 +34,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
String MAPREDUCE_NAME = "redisson_mapreduce";
/**
* Submits a value-returning task for execution synchronously and returns a
* Synchronously submits a value-returning task for execution asynchronously and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
@ -44,9 +45,24 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
*/
@Override
<T> RExecutorFuture<T> submit(Callable<T> task);
/**
* Submits tasks batch for execution synchronously.
* Synchronously submits a value-returning task with defined <code>timeToLive</code> parameter
* for execution asynchronously. Returns a Future representing the pending
* results of the task. The Future's {@code get} method will return the
* task's result upon successful completion.
*
* @param task the task to submit
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
*/
<T> RExecutorFuture<T> submit(Callable<T> task, long timeToLive, TimeUnit timeUnit);
/**
* Synchronously submits tasks batch for execution asynchronously.
* All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
@ -56,8 +72,8 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
RExecutorBatchFuture submit(Callable<?>...tasks);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* Synchronously submits a Runnable task for execution asynchronously
* and returns a Future representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
@ -69,8 +85,8 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
<T> RExecutorFuture<T> submit(Runnable task, T result);;
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* Synchronously submits a Runnable task for execution asynchronously.
* Returns a Future representing task completion. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
@ -80,7 +96,20 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
RExecutorFuture<?> submit(Runnable task);
/**
* Submits tasks batch for execution synchronously.
* Synchronously submits a task with defined <code>timeToLive</code> parameter
* for execution asynchronously. Returns a Future representing task completion.
* The Future's {@code get} method will return the
* task's result upon successful completion.
*
* @param task the task to submit
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return a Future representing pending completion of the task
*/
RExecutorFuture<?> submit(Runnable task, long timeToLive, TimeUnit timeUnit);
/**
* Synchronously submits tasks batch for execution asynchronously.
* All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* Distributed async implementation of {@link java.util.concurrent.ExecutorService}
@ -76,7 +77,22 @@ public interface RExecutorServiceAsync {
<T> RExecutorFuture<T> submitAsync(Callable<T> task);
/**
* Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically,
* Submits a value-returning task with defined <code>timeToLive</code> parameter
* for execution asynchronously. Returns a Future representing the pending
* results of the task. The Future's {@code get} method will return the
* task's result upon successful completion.
*
* @param task the task to submit
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
*/
<T> RExecutorFuture<T> submitAsync(Callable<T> task, long timeToLive, TimeUnit timeUnit);
/**
* Submits tasks batch for execution asynchronously.
* All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.
*
* @param tasks - tasks to execute
@ -92,6 +108,19 @@ public interface RExecutorServiceAsync {
*/
RExecutorFuture<?> submitAsync(Runnable task);
/**
* Submits a task with defined <code>timeToLive</code> parameter
* for execution asynchronously. Returns a Future representing task completion.
* The Future's {@code get} method will return the
* task's result upon successful completion.
*
* @param task the task to submit
* @param timeToLive - time to live interval
* @param timeUnit - unit of time to live interval
* @return a Future representing pending completion of the task
*/
RExecutorFuture<?> submitAsync(Runnable task, long timeToLive, TimeUnit timeUnit);
/**
* Submits tasks batch for execution asynchronously. All tasks are stored to executor request queue atomically,
* if case of any error none of tasks will be added.

@ -15,16 +15,19 @@
*/
package org.redisson.executor;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncService;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
/**
*
* @author Nikita Koksharov
@ -32,6 +35,12 @@ import org.redisson.remote.ResponseEntry;
*/
public class RedissonExecutorRemoteService extends RedissonRemoteService {
private String tasksExpirationTimeName;
private String tasksCounterName;
private String statusName;
private String tasksRetryIntervalName;
private String terminationTopicName;
public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
@ -39,7 +48,44 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
@Override
protected RFuture<RemoteServiceRequest> getTask(String requestId, RMap<String, RemoteServiceRequest> tasks) {
return tasks.getAsync(requestId);
return commandExecutor.evalWriteAsync(tasks.getName(), codec, RedisCommands.EVAL_OBJECT,
"local value = redis.call('zscore', KEYS[2], ARGV[1]); " +
"if (value ~= false and tonumber(value) < tonumber(ARGV[2])) then "
+ "redis.call('zrem', KEYS[2], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3]);"
+ "if redis.call('get', KEYS[4]) == ARGV[3] then "
+ "redis.call('del', KEYS[5]);"
+ "redis.call('set', KEYS[4], ARGV[4]);"
+ "redis.call('publish', KEYS[6], ARGV[4]);"
+ "end;"
+ "end;"
+ "return nil;"
+ "end;"
+ "return redis.call('hget', KEYS[1], ARGV[1]); ",
Arrays.asList(tasks.getName(), tasksExpirationTimeName, tasksCounterName, statusName, tasksRetryIntervalName, terminationTopicName),
requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
public void setTasksExpirationTimeName(String tasksExpirationTimeName) {
this.tasksExpirationTimeName = tasksExpirationTimeName;
}
public void setTasksCounterName(String tasksCounterName) {
this.tasksCounterName = tasksCounterName;
}
public void setStatusName(String statusName) {
this.statusName = statusName;
}
public void setTasksRetryIntervalName(String tasksRetryIntervalName) {
this.tasksRetryIntervalName = tasksRetryIntervalName;
}
public void setTerminationTopicName(String terminationTopicName) {
this.terminationTopicName = terminationTopicName;
}
}

@ -15,17 +15,10 @@
*/
package org.redisson.executor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.Redisson;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonShutdownException;
@ -40,11 +33,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CustomObjectInputStream;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
import org.redisson.executor.params.*;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import org.redisson.misc.Injector;
@ -53,10 +42,15 @@ import org.redisson.remote.ResponseEntry;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.ByteArrayInputStream;
import java.io.ObjectInput;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* Executor service runs Callable and Runnable tasks.

@ -47,12 +47,17 @@ public class TasksService extends BaseRemoteService {
protected String schedulerQueueName;
protected String schedulerChannelName;
protected String tasksRetryIntervalName;
protected String tasksExpirationTimeName;
protected long tasksRetryInterval;
public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
public void setTasksExpirationTimeName(String tasksExpirationTimeName) {
this.tasksExpirationTimeName = tasksExpirationTimeName;
}
public void setTasksRetryIntervalName(String tasksRetryIntervalName) {
this.tasksRetryIntervalName = tasksRetryIntervalName;
}
@ -121,6 +126,10 @@ public class TasksService extends BaseRemoteService {
if (tasksRetryInterval > 0) {
retryStartTime = System.currentTimeMillis() + tasksRetryInterval;
}
long expireTime = 0;
if (params.getTtl() > 0) {
expireTime = System.currentTimeMillis() + params.getTtl();
}
return getAddCommandExecutor().evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
@ -128,7 +137,11 @@ public class TasksService extends BaseRemoteService {
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('rpush', KEYS[6], ARGV[2]); "
+ "redis.call('incr', KEYS[1]);"
+ "if tonumber(ARGV[5]) > 0 then "
+ "redis.call('zadd', KEYS[8], ARGV[5], ARGV[2]);"
+ "end; "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('set', KEYS[7], ARGV[4]);"
+ "redis.call('zadd', KEYS[3], ARGV[1], 'ff' .. ARGV[2]);"
@ -137,19 +150,21 @@ public class TasksService extends BaseRemoteService {
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end "
+ "end; "
+ "end;"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName),
retryStartTime, request.getId(), encode(request), tasksRetryInterval);
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName,
tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName),
retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime);
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); "
+ "redis.call('zrem', KEYS[8], ARGV[1]); "
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
// remove from executor queue
@ -168,7 +183,8 @@ public class TasksService extends BaseRemoteService {
+ "return 1; "
+ "end;"
+ "return 0;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName,
tasksName, tasksRetryIntervalName, tasksExpirationTimeName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}

@ -31,6 +31,7 @@ public class TaskParameters implements Serializable {
private byte[] lambdaBody;
private byte[] state;
private String requestId;
private long ttl;
public TaskParameters() {
}
@ -42,7 +43,14 @@ public class TaskParameters implements Serializable {
this.state = state;
this.lambdaBody = lambdaBody;
}
public long getTtl() {
return ttl;
}
public void setTtl(long ttl) {
this.ttl = ttl;
}
public byte[] getLambdaBody() {
return lambdaBody;
}

@ -25,11 +25,7 @@ import org.redisson.BaseTest;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.RedissonNode;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.api.*;
import org.redisson.api.annotation.RInject;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
@ -524,7 +520,19 @@ public class RedissonExecutorServiceTest extends BaseTest {
Future<String> future = redisson.getExecutorService("test").submit(new ParameterizedTask("testparam"));
assertThat(future.get()).isEqualTo("testparam");
}
@Test
public void testTTL() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.submit(new DelayedTask(2000, "test"));
Future<?> future = executor.submit(new ScheduledRunnableTask("testparam"), 1, TimeUnit.SECONDS);
Thread.sleep(500);
assertThat(executor.getTaskCount()).isEqualTo(2);
Thread.sleep(2000);
assertThat(executor.getTaskCount()).isEqualTo(0);
assertThat(redisson.getKeys().countExists("testparam")).isEqualTo(0);
}
@Test(expected = IllegalArgumentException.class)
public void testAnonymousRunnable() {
redisson.getExecutorService("test").submit(new Runnable() {

Loading…
Cancel
Save