Fixed - cancel schedule task occurs concurrency problem

Signed-off-by: seakider <seakider@gmail.com>
pull/6379/head
seakider 4 weeks ago
parent e39d99f298
commit 3981c96f78

@ -54,7 +54,14 @@ public class ScheduledTasksService extends TasksService {
expireTime = System.currentTimeMillis() + params.getTtl();
}
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
String script = "";
if (requestId != null) {
script += "if redis.call('hget', KEYS[5], ARGV[2]) == false then "
+ "return 0;"
+ "end;";
}
script +=
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "
+ "local retryInterval = redis.call('get', KEYS[6]); "
@ -82,7 +89,9 @@ public class ScheduledTasksService extends TasksService {
+ "end "
+ "return 1;"
+ "end;"
+ "return 0;",
+ "return 0;";
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, script,
Arrays.asList(tasksCounterName, statusName, schedulerQueueName,
schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName),
params.getStartTime(), request.getId(), encode(request), tasksRetryInterval, expireTime);
@ -92,12 +101,7 @@ public class ScheduledTasksService extends TasksService {
@Override
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId) {
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove from scheduler queue
"if redis.call('exists', KEYS[3]) == 0 then "
+ "return nil;"
+ "end;"
+ "local task = redis.call('hget', KEYS[6], ARGV[1]); "
"local task = redis.call('hget', KEYS[6], ARGV[1]); "
+ "redis.call('hdel', KEYS[6], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], 'ff:' .. ARGV[1]); "

@ -176,9 +176,6 @@ public class TasksRunnerService implements RemoteExecutorService {
@Override
public void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params) {
executeRunnable(params, false);
if (!redisson.getMap(tasksName, StringCodec.INSTANCE).containsKey(params.getRequestId())) {
return;
}
long newStartTime = System.currentTimeMillis() + params.getDelay();
params.setStartTime(newStartTime);

Loading…
Cancel
Save