diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 1f537dcf3..3de3bdee6 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -311,7 +311,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { if (options.getExecutorService() != null) { es = options.getExecutorService(); } - + + remoteService.setTaskTimeout(options.getTaskTimeout()); remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es); workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener() { @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index e42b913b4..93dbcaa85 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -352,7 +352,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS }); } - private void invokeMethod(Class remoteInterface, + protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture cancelRequestFuture, AtomicReference responseHolder) { diff --git a/redisson/src/main/java/org/redisson/api/WorkerOptions.java b/redisson/src/main/java/org/redisson/api/WorkerOptions.java index 73d5d1e2b..4db640a75 100644 --- a/redisson/src/main/java/org/redisson/api/WorkerOptions.java +++ b/redisson/src/main/java/org/redisson/api/WorkerOptions.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.redisson.config.Config; import org.springframework.beans.factory.BeanFactory; @@ -31,6 +32,7 @@ public final class WorkerOptions { private int workers = 1; private ExecutorService executorService; private BeanFactory beanFactory; + private long taskTimeout; private WorkerOptions() { } @@ -87,4 +89,20 @@ public final class WorkerOptions { return this; } + /** + * Defines task timeout since moment of task execution start + * + * @param timeout + * @param unit + * @return + */ + public WorkerOptions taskTimeout(long timeout, TimeUnit unit) { + this.taskTimeout = unit.toMillis(timeout); + return this; + } + + public long getTaskTimeout() { + return taskTimeout; + } + } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index c0833cc2a..d88a08744 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -17,16 +17,20 @@ package org.redisson.executor; import org.redisson.RedissonExecutorService; import org.redisson.RedissonRemoteService; +import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncService; -import org.redisson.remote.RemoteServiceRequest; -import org.redisson.remote.ResponseEntry; +import org.redisson.misc.RPromise; +import org.redisson.remote.*; import java.util.Arrays; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * @@ -41,6 +45,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { private String tasksRetryIntervalName; private String terminationTopicName; private String schedulerQueueName; + private long taskTimeout; public RedissonExecutorRemoteService(Codec codec, String name, CommandAsyncService commandExecutor, String executorId, ConcurrentMap responses) { @@ -75,6 +80,20 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { requestId, System.currentTimeMillis(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); } + @Override + protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture cancelRequestFuture, AtomicReference responseHolder) { + if (taskTimeout > 0) { + commandExecutor.getConnectionManager().getGroup().schedule(() -> { + ((RPromise)cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false)); + }, taskTimeout, TimeUnit.MILLISECONDS); + } + super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder); + } + + public void setTaskTimeout(long taskTimeout) { + this.taskTimeout = taskTimeout; + } + public void setSchedulerQueueName(String schedulerQueueName) { this.schedulerQueueName = schedulerQueueName; } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 0adc33cb4..dbdab927b 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -312,6 +312,18 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(id).hasSize(34); } + @Test + public void testTaskTimeout() throws InterruptedException { + RExecutorService executor = redisson.getExecutorService("test1"); + executor.registerWorkers(WorkerOptions.defaults().taskTimeout(1, TimeUnit.SECONDS)); + + RExecutorFuture future = executor.submit(new ScheduledLongRunnableTask("executed1")); + + Thread.sleep(1050); + + assertThat(future.isCancelled()).isTrue(); + } + @Test public void testCancelAndInterrupt() throws InterruptedException, ExecutionException { RExecutorService executor = redisson.getExecutorService("test");