From 3981c96f78f5bcf6a135b99e26200b65473c8731 Mon Sep 17 00:00:00 2001 From: seakider <seakider@gmail.com> Date: Mon, 6 Jan 2025 21:56:54 +0800 Subject: [PATCH] Fixed - cancel schedule task occurs concurrency problem Signed-off-by: seakider <seakider@gmail.com> --- .../executor/ScheduledTasksService.java | 22 +++++++++++-------- .../redisson/executor/TasksRunnerService.java | 3 --- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 51a17be67..3f9a0f643 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -53,8 +53,15 @@ public class ScheduledTasksService extends TasksService { if (params.getTtl() > 0) { expireTime = System.currentTimeMillis() + params.getTtl(); } - - RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + + String script = ""; + if (requestId != null) { + script += "if redis.call('hget', KEYS[5], ARGV[2]) == false then " + + "return 0;" + + "end;"; + } + + script += // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " + "local retryInterval = redis.call('get', KEYS[6]); " @@ -82,7 +89,9 @@ public class ScheduledTasksService extends TasksService { + "end " + "return 1;" + "end;" - + "return 0;", + + "return 0;"; + + RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, script, Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName), params.getStartTime(), request.getId(), encode(request), tasksRetryInterval, expireTime); @@ -92,12 +101,7 @@ public class ScheduledTasksService extends TasksService { @Override protected CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId) { RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - // remove from scheduler queue - "if redis.call('exists', KEYS[3]) == 0 then " - + "return nil;" - + "end;" - - + "local task = redis.call('hget', KEYS[6], ARGV[1]); " + "local task = redis.call('hget', KEYS[6], ARGV[1]); " + "redis.call('hdel', KEYS[6], ARGV[1]); " + "redis.call('zrem', KEYS[2], 'ff:' .. ARGV[1]); " diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index adebd6dae..c2744f0db 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -176,9 +176,6 @@ public class TasksRunnerService implements RemoteExecutorService { @Override public void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params) { executeRunnable(params, false); - if (!redisson.getMap(tasksName, StringCodec.INSTANCE).containsKey(params.getRequestId())) { - return; - } long newStartTime = System.currentTimeMillis() + params.getDelay(); params.setStartTime(newStartTime);