From c1d8c4f84bb20be7890186fc1cec9285c9012e62 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 20 Jan 2025 12:34:24 +0300 Subject: [PATCH] Fixed - task id duplication check added to RScheduledExecutorService. #5409 --- .../org/redisson/RedissonExecutorService.java | 31 +++++++++++++++++-- .../executor/ScheduledTasksService.java | 5 ++- .../org/redisson/executor/TasksService.java | 10 +++++- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 49b61cfa7..4713a7b4d 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -23,6 +23,7 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.config.MasterSlaveServersConfig; import org.redisson.executor.*; import org.redisson.executor.params.*; import org.redisson.misc.CompletableFutureWrapper; @@ -60,7 +61,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final CommandAsyncExecutor commandExecutor; private final Codec codec; private final Redisson redisson; - + + private final String tasksLatchName; private final String tasksName; private final String schedulerQueueName; private final String schedulerChannelName; @@ -122,6 +124,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { String objectName = requestQueueName; tasksCounterName = objectName + ":counter"; tasksName = objectName + ":tasks"; + tasksLatchName = objectName + ":task-latch"; statusName = objectName + ":status"; terminationTopic = RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, objectName + ":termination-topic"); @@ -148,6 +151,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); executorRemoteService.setTasksName(tasksName); + executorRemoteService.setTasksLatchName(tasksLatchName); executorRemoteService.setSchedulerChannelName(schedulerChannelName); executorRemoteService.setSchedulerQueueName(schedulerQueueName); executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); @@ -163,6 +167,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { scheduledRemoteService.setSchedulerQueueName(schedulerQueueName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setTasksName(tasksName); + scheduledRemoteService.setTasksLatchName(tasksLatchName); scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName); scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval()); @@ -1137,7 +1142,21 @@ public class RedissonExecutorService implements RScheduledExecutorService { private RemotePromise executeWithCheck(String id, Object task, Supplier> function) { check(task); - RFuture r = hasTaskAsync(id); + + MasterSlaveServersConfig config = commandExecutor.getServiceManager().getConfig(); + int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts(); + + String taskName = tasksLatchName + ":" + id; + + RFuture r = commandExecutor.evalWriteNoRetryAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then " + + "if redis.call('set', KEYS[2], 1, 'NX', 'PX', ARGV[1]) ~= nil then " + + "return 0; " + + "end;" + + "end;" + + "return 1; ", + Arrays.asList(tasksName, taskName), + timeout, id); AtomicReference> ref = new AtomicReference<>(); RemotePromise promise = new RemotePromise(id) { @@ -1153,7 +1172,13 @@ public class RedissonExecutorService implements RScheduledExecutorService { CompletableFuture addFuture = new CompletableFuture<>(); promise.setAddFuture(addFuture); - r.thenAccept(v -> { + r.whenComplete((v, e) -> { + if (e != null) { + addFuture.completeExceptionally(e); + promise.completeExceptionally(e); + return; + } + if (v) { addFuture.completeExceptionally(new IllegalArgumentException("Duplicated id: '" + id + "' is not allowed")); return; diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 3f9a0f643..b75485640 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -49,6 +49,8 @@ public class ScheduledTasksService extends TasksService { protected CompletableFuture addAsync(String requestQueueName, RemoteServiceRequest request) { ScheduledParameters params = (ScheduledParameters) request.getArgs()[0]; + String taskName = tasksLatchName + ":" + request.getId(); + long expireTime = 0; if (params.getTtl() > 0) { expireTime = System.currentTimeMillis() + params.getTtl(); @@ -80,6 +82,7 @@ public class ScheduledTasksService extends TasksService { + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);" + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + + "redis.call('del', KEYS[8]);" + "redis.call('incr', KEYS[1]);" + "local v = redis.call('zrange', KEYS[3], 0, 0); " // if new task added to queue head then publish its startTime @@ -93,7 +96,7 @@ public class ScheduledTasksService extends TasksService { RFuture f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, script, Arrays.asList(tasksCounterName, statusName, schedulerQueueName, - schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName), + schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName, taskName), params.getStartTime(), request.getId(), encode(request), tasksRetryInterval, expireTime); return f.toCompletableFuture(); } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 70b7e5e4b..42fa2ebf0 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -43,6 +43,7 @@ public class TasksService extends BaseRemoteService { protected String tasksCounterName; protected String statusName; protected String tasksName; + protected String tasksLatchName; protected String schedulerQueueName; protected String schedulerChannelName; protected String tasksRetryIntervalName; @@ -53,6 +54,10 @@ public class TasksService extends BaseRemoteService { super(codec, name, commandExecutor, executorId); } + public void setTasksLatchName(String tasksLatchName) { + this.tasksLatchName = tasksLatchName; + } + public void setTasksExpirationTimeName(String tasksExpirationTimeName) { this.tasksExpirationTimeName = tasksExpirationTimeName; } @@ -111,6 +116,8 @@ public class TasksService extends BaseRemoteService { protected CompletableFuture addAsync(String requestQueueName, RemoteServiceRequest request) { TaskParameters params = (TaskParameters) request.getArgs()[0]; + String taskName = tasksLatchName + ":" + request.getId(); + long retryStartTime = 0; if (tasksRetryInterval > 0) { retryStartTime = System.currentTimeMillis() + tasksRetryInterval; @@ -124,6 +131,7 @@ public class TasksService extends BaseRemoteService { // check if executor service not in shutdown state "if redis.call('exists', KEYS[2]) == 0 then " + "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);" + + "redis.call('del', KEYS[9]);" + "redis.call('rpush', KEYS[6], ARGV[2]); " + "redis.call('incr', KEYS[1]);" @@ -146,7 +154,7 @@ public class TasksService extends BaseRemoteService { + "end;" + "return 0;", Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, - tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName), + tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName, taskName), retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime); return f.toCompletableFuture(); }