From 7ac980b3f57dcba694938f11294cab6cb515fb05 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 12 Sep 2019 12:35:07 +0300 Subject: [PATCH] Fixed - ExecutorService tasks duplication after task retry event. #2308 --- .../redisson/executor/TasksRunnerService.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 18533b348..04f1073b0 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -222,17 +222,17 @@ public class TasksRunnerService implements RemoteExecutorService { } } - protected void scheduleRetryTimeRenewal(final String requestId) { + protected void scheduleRetryTimeRenewal(String requestId, long retryInterval) { ((Redisson) redisson).getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { renewRetryTime(requestId); } - }, 5, TimeUnit.SECONDS); + }, Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS); } - protected void renewRetryTime(final String requestId) { - RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + protected void renewRetryTime(String requestId) { + RFuture future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG, // check if executor service not in shutdown state "local name = ARGV[2];" + "local scheduledName = ARGV[2];" @@ -252,14 +252,19 @@ public class TasksRunnerService implements RemoteExecutorService { + "if v[1] == ARGV[2] then " + "redis.call('publish', KEYS[3], startTime); " + "end;" - + "return 1; " + + "return retryInterval; " + "end;" - + "return 0;", + + "return nil;", Arrays.asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), System.currentTimeMillis(), requestId); future.onComplete((res, e) -> { - if (e != null || res) { - scheduleRetryTimeRenewal(requestId); + if (e != null) { + scheduleRetryTimeRenewal(requestId, 10000); + return; + } + + if (res != null) { + scheduleRetryTimeRenewal(requestId, res); } }); }