Fixed - task id duplication check added to RScheduledExecutorService. #5409

pull/6406/head
Nikita Koksharov 2 weeks ago
parent c8799bd12d
commit c1d8c4f84b

@ -23,6 +23,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.executor.*;
import org.redisson.executor.params.*;
import org.redisson.misc.CompletableFutureWrapper;
@ -60,7 +61,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final CommandAsyncExecutor commandExecutor;
private final Codec codec;
private final Redisson redisson;
private final String tasksLatchName;
private final String tasksName;
private final String schedulerQueueName;
private final String schedulerChannelName;
@ -122,6 +124,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
String objectName = requestQueueName;
tasksCounterName = objectName + ":counter";
tasksName = objectName + ":tasks";
tasksLatchName = objectName + ":task-latch";
statusName = objectName + ":status";
terminationTopic = RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, objectName + ":termination-topic");
@ -148,6 +151,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
executorRemoteService.setTasksLatchName(tasksLatchName);
executorRemoteService.setSchedulerChannelName(schedulerChannelName);
executorRemoteService.setSchedulerQueueName(schedulerQueueName);
executorRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
@ -163,6 +167,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
scheduledRemoteService.setSchedulerQueueName(schedulerQueueName);
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setTasksLatchName(tasksLatchName);
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
@ -1137,7 +1142,21 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private <T> RemotePromise<T> executeWithCheck(String id, Object task, Supplier<RFuture<T>> function) {
check(task);
RFuture<Boolean> r = hasTaskAsync(id);
MasterSlaveServersConfig config = commandExecutor.getServiceManager().getConfig();
int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
String taskName = tasksLatchName + ":" + id;
RFuture<Boolean> r = commandExecutor.evalWriteNoRetryAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then "
+ "if redis.call('set', KEYS[2], 1, 'NX', 'PX', ARGV[1]) ~= nil then "
+ "return 0; "
+ "end;"
+ "end;"
+ "return 1; ",
Arrays.asList(tasksName, taskName),
timeout, id);
AtomicReference<RemotePromise<T>> ref = new AtomicReference<>();
RemotePromise<T> promise = new RemotePromise<T>(id) {
@ -1153,7 +1172,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
CompletableFuture<Boolean> addFuture = new CompletableFuture<>();
promise.setAddFuture(addFuture);
r.thenAccept(v -> {
r.whenComplete((v, e) -> {
if (e != null) {
addFuture.completeExceptionally(e);
promise.completeExceptionally(e);
return;
}
if (v) {
addFuture.completeExceptionally(new IllegalArgumentException("Duplicated id: '" + id + "' is not allowed"));
return;

@ -49,6 +49,8 @@ public class ScheduledTasksService extends TasksService {
protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
ScheduledParameters params = (ScheduledParameters) request.getArgs()[0];
String taskName = tasksLatchName + ":" + request.getId();
long expireTime = 0;
if (params.getTtl() > 0) {
expireTime = System.currentTimeMillis() + params.getTtl();
@ -80,6 +82,7 @@ public class ScheduledTasksService extends TasksService {
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);"
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('del', KEYS[8]);"
+ "redis.call('incr', KEYS[1]);"
+ "local v = redis.call('zrange', KEYS[3], 0, 0); "
// if new task added to queue head then publish its startTime
@ -93,7 +96,7 @@ public class ScheduledTasksService extends TasksService {
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, script,
Arrays.asList(tasksCounterName, statusName, schedulerQueueName,
schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName),
schedulerChannelName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName, taskName),
params.getStartTime(), request.getId(), encode(request), tasksRetryInterval, expireTime);
return f.toCompletableFuture();
}

@ -43,6 +43,7 @@ public class TasksService extends BaseRemoteService {
protected String tasksCounterName;
protected String statusName;
protected String tasksName;
protected String tasksLatchName;
protected String schedulerQueueName;
protected String schedulerChannelName;
protected String tasksRetryIntervalName;
@ -53,6 +54,10 @@ public class TasksService extends BaseRemoteService {
super(codec, name, commandExecutor, executorId);
}
public void setTasksLatchName(String tasksLatchName) {
this.tasksLatchName = tasksLatchName;
}
public void setTasksExpirationTimeName(String tasksExpirationTimeName) {
this.tasksExpirationTimeName = tasksExpirationTimeName;
}
@ -111,6 +116,8 @@ public class TasksService extends BaseRemoteService {
protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
TaskParameters params = (TaskParameters) request.getArgs()[0];
String taskName = tasksLatchName + ":" + request.getId();
long retryStartTime = 0;
if (tasksRetryInterval > 0) {
retryStartTime = System.currentTimeMillis() + tasksRetryInterval;
@ -124,6 +131,7 @@ public class TasksService extends BaseRemoteService {
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('del', KEYS[9]);"
+ "redis.call('rpush', KEYS[6], ARGV[2]); "
+ "redis.call('incr', KEYS[1]);"
@ -146,7 +154,7 @@ public class TasksService extends BaseRemoteService {
+ "end;"
+ "return 0;",
Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName,
tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName),
tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName, taskName),
retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime);
return f.toCompletableFuture();
}

Loading…
Cancel
Save