From 0dcefca39c2a3add9258536b987232f4aa40a272 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 18 Jan 2019 13:54:30 +0300 Subject: [PATCH] Fixed - running scheduleWithFixedDelay Job couldn't be canceled. #1869 --- .../redisson/executor/TasksRunnerService.java | 22 +++++++----- .../RedissonScheduledExecutorServiceTest.java | 36 +++++++++++++++++++ 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 5b3492432..e201a1284 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -146,7 +146,7 @@ public class TasksRunnerService implements RemoteExecutorService { future = service.schedule(params); } try { - executeRunnable(params, nextStartDate); + executeRunnable(params, nextStartDate == null); } catch (RuntimeException e) { // cancel task if it throws an exception if (future != null) { @@ -178,7 +178,11 @@ public class TasksRunnerService implements RemoteExecutorService { @Override public void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params) { - executeRunnable(params); + executeRunnable(params, false); + if (!redisson.getMap(tasksName, StringCodec.INSTANCE).containsKey(params.getRequestId())) { + return; + } + long newStartTime = System.currentTimeMillis() + params.getDelay(); params.setStartTime(newStartTime); asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleWithFixedDelay(params); @@ -209,7 +213,7 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - finish(params.getRequestId(), null); + finish(params.getRequestId(), true); } } @@ -293,7 +297,7 @@ public class TasksRunnerService implements RemoteExecutorService { } } - public void executeRunnable(TaskParameters params, Date nextDate) { + public void executeRunnable(TaskParameters params, boolean removeTask) { if (params.getRequestId() != null && params.getRequestId().startsWith("00")) { renewRetryTime(params.getRequestId()); } @@ -308,13 +312,13 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - finish(params.getRequestId(), nextDate); + finish(params.getRequestId(), removeTask); } } @Override public void executeRunnable(TaskParameters params) { - executeRunnable(params, null); + executeRunnable(params, true); } /** @@ -327,9 +331,9 @@ public class TasksRunnerService implements RemoteExecutorService { * * @param requestId */ - private void finish(String requestId, Date nextDate) { + private void finish(String requestId, boolean removeTask) { String script = ""; - if (nextDate == null) { + if (removeTask) { script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" + "if scheduled == false then " + "redis.call('hdel', KEYS[4], ARGV[3]); " @@ -345,7 +349,7 @@ public class TasksRunnerService implements RemoteExecutorService { + "end;" + "end;"; - commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, + commandExecutor.evalWrite(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, script, Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index f7c4e18c4..4053373df 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -344,6 +344,42 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { // skip } } + + public static class ScheduledRunnableTask2 implements Runnable, Serializable { + private static final long serialVersionUID = -3523561767248576192L; + private String key; + + @RInject + private RedissonClient redisson; + + public ScheduledRunnableTask2(String key) { + this.key = key; + } + + @Override + public void run() { + System.out.println("job is running"); + try { + redisson.getAtomicLong(key).incrementAndGet(); + Thread.sleep(15000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("job is over"); + } + } + + @Test + public void testCancelAtFixedDelay2() throws InterruptedException, ExecutionException { + RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(30, TimeUnit.MINUTES)); + executor.registerWorkers(5); + RScheduledFuture future1 = executor.scheduleWithFixedDelay(new ScheduledRunnableTask2("executed1"), 1, 2, TimeUnit.SECONDS); + Thread.sleep(5000); + assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1); + assertThat(executor.cancelTask(future1.getTaskId())).isTrue(); + Thread.sleep(30000); + assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(1); + } @Test