From db1ba0c9289c64723c9b4dca228ce45f307cac2b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 16 Nov 2018 21:15:03 +0300 Subject: [PATCH] Fixed - task scheduled with cron pattern isn't executed with single worker. #1734 --- .../redisson/executor/TasksRunnerService.java | 48 +++++++++++-------- .../RedissonScheduledExecutorServiceTest.java | 41 ++++++++++++++++ 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index a5e5c575f..5b3492432 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -146,7 +146,7 @@ public class TasksRunnerService implements RemoteExecutorService { future = service.schedule(params); } try { - executeRunnable(params); + executeRunnable(params, nextStartDate); } catch (RuntimeException e) { // cancel task if it throws an exception if (future != null) { @@ -209,7 +209,7 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - finish(params.getRequestId()); + finish(params.getRequestId(), null); } } @@ -293,8 +293,7 @@ public class TasksRunnerService implements RemoteExecutorService { } } - @Override - public void executeRunnable(TaskParameters params) { + public void executeRunnable(TaskParameters params, Date nextDate) { if (params.getRequestId() != null && params.getRequestId().startsWith("00")) { renewRetryTime(params.getRequestId()); } @@ -309,9 +308,14 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - finish(params.getRequestId()); + finish(params.getRequestId(), nextDate); } } + + @Override + public void executeRunnable(TaskParameters params) { + executeRunnable(params, null); + } /** * Check shutdown state. If tasksCounter equals 0 @@ -323,22 +327,26 @@ public class TasksRunnerService implements RemoteExecutorService { * * @param requestId */ - private void finish(String requestId) { + private void finish(String requestId, Date nextDate) { + String script = ""; + if (nextDate == null) { + script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" + + "if scheduled == false then " + + "redis.call('hdel', KEYS[4], ARGV[3]); " + + "end;"; + } + script += "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" + + "if redis.call('decr', KEYS[1]) == 0 then " + + "redis.call('del', KEYS[1]);" + + "if redis.call('get', KEYS[2]) == ARGV[1] then " + + "redis.call('del', KEYS[6]);" + + "redis.call('set', KEYS[2], ARGV[2]);" + + "redis.call('publish', KEYS[3], ARGV[2]);" + + "end;" + + "end;"; + commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, - "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" - + "if scheduled == false then " - + "redis.call('hdel', KEYS[4], ARGV[3]); " - + "end;" + - - "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" + - "if redis.call('decr', KEYS[1]) == 0 then " - + "redis.call('del', KEYS[1]);" - + "if redis.call('get', KEYS[2]) == ARGV[1] then " - + "redis.call('del', KEYS[6]);" - + "redis.call('set', KEYS[2], ARGV[2]);" - + "redis.call('publish', KEYS[3], ARGV[2]);" - + "end;" - + "end;", + script, Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId); } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 54d8778bb..67ed44764 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -17,6 +17,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.redisson.BaseTest; +import org.redisson.Redisson; import org.redisson.RedissonExecutorService; import org.redisson.RedissonNode; import org.redisson.api.CronSchedule; @@ -24,7 +25,9 @@ import org.redisson.api.ExecutorOptions; import org.redisson.api.RExecutorFuture; import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledFuture; +import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; +import org.redisson.api.annotation.RInject; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; @@ -55,6 +58,43 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { node.shutdown(); } + public static class TestTask implements Runnable { + + @RInject + RedissonClient redisson; + + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + redisson.getAtomicLong("counter").incrementAndGet(); + } + + } + + @Test + public void testSingleWorker() throws InterruptedException { + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.getExecutorServiceWorkers().put("JobA", 1); + RedissonNode node = RedissonNode.create(nodeConfig); + node.start(); + + RedissonClient client = Redisson.create(config); + RScheduledExecutorService executorService = client.getExecutorService("JobA"); + executorService.schedule(new TestTask() , CronSchedule.of("0/1 * * * * ?")); + + TimeUnit.MILLISECONDS.sleep(4800); + + assertThat(client.getAtomicLong("counter").get()).isEqualTo(4); + + client.shutdown(); + node.shutdown(); + } + @Test public void testDelay() throws InterruptedException { RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS)); @@ -106,6 +146,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1)); + node.shutdown(); node = RedissonNode.create(nodeConfig); node.start();