Feature - WorkerOptions.taskTimeout setting added. #2491

pull/2452/merge
Nikita Koksharov 5 years ago
parent 387fb465f0
commit 0ba729eb7f

@ -311,7 +311,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (options.getExecutorService() != null) {
es = options.getExecutorService();
}
remoteService.setTaskTimeout(options.getTaskTimeout());
remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es);
workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener<String>() {
@Override

@ -352,7 +352,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
});
}
private <T> void invokeMethod(Class<T> remoteInterface,
protected <T> void invokeMethod(Class<T> remoteInterface,
RBlockingQueue<String> requestQueue, RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, ExecutorService executor,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.redisson.config.Config;
import org.springframework.beans.factory.BeanFactory;
@ -31,6 +32,7 @@ public final class WorkerOptions {
private int workers = 1;
private ExecutorService executorService;
private BeanFactory beanFactory;
private long taskTimeout;
private WorkerOptions() {
}
@ -87,4 +89,20 @@ public final class WorkerOptions {
return this;
}
/**
* Defines task timeout since moment of task execution start
*
* @param timeout
* @param unit
* @return
*/
public WorkerOptions taskTimeout(long timeout, TimeUnit unit) {
this.taskTimeout = unit.toMillis(timeout);
return this;
}
public long getTaskTimeout() {
return taskTimeout;
}
}

@ -17,16 +17,20 @@ package org.redisson.executor;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncService;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
import org.redisson.misc.RPromise;
import org.redisson.remote.*;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
*
@ -41,6 +45,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
private String tasksRetryIntervalName;
private String terminationTopicName;
private String schedulerQueueName;
private long taskTimeout;
public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
@ -75,6 +80,20 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
@Override
protected <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
if (taskTimeout > 0) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
((RPromise)cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS);
}
super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder);
}
public void setTaskTimeout(long taskTimeout) {
this.taskTimeout = taskTimeout;
}
public void setSchedulerQueueName(String schedulerQueueName) {
this.schedulerQueueName = schedulerQueueName;
}

@ -312,6 +312,18 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(id).hasSize(34);
}
@Test
public void testTaskTimeout() throws InterruptedException {
RExecutorService executor = redisson.getExecutorService("test1");
executor.registerWorkers(WorkerOptions.defaults().taskTimeout(1, TimeUnit.SECONDS));
RExecutorFuture<?> future = executor.submit(new ScheduledLongRunnableTask("executed1"));
Thread.sleep(1050);
assertThat(future.isCancelled()).isTrue();
}
@Test
public void testCancelAndInterrupt() throws InterruptedException, ExecutionException {
RExecutorService executor = redisson.getExecutorService("test");

Loading…
Cancel
Save