Fixed - ExecutorService tasks duplication after task retry event. #2308

pull/2323/head
Nikita Koksharov 5 years ago
parent d4d66216ad
commit 7ac980b3f5

@ -222,17 +222,17 @@ public class TasksRunnerService implements RemoteExecutorService {
} }
} }
protected void scheduleRetryTimeRenewal(final String requestId) { protected void scheduleRetryTimeRenewal(String requestId, long retryInterval) {
((Redisson) redisson).getConnectionManager().newTimeout(new TimerTask() { ((Redisson) redisson).getConnectionManager().newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
renewRetryTime(requestId); renewRetryTime(requestId);
} }
}, 5, TimeUnit.SECONDS); }, Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS);
} }
protected void renewRetryTime(final String requestId) { protected void renewRetryTime(String requestId) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, RFuture<Long> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
// check if executor service not in shutdown state // check if executor service not in shutdown state
"local name = ARGV[2];" "local name = ARGV[2];"
+ "local scheduledName = ARGV[2];" + "local scheduledName = ARGV[2];"
@ -252,14 +252,19 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "if v[1] == ARGV[2] then " + "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[3], startTime); " + "redis.call('publish', KEYS[3], startTime); "
+ "end;" + "end;"
+ "return 1; " + "return retryInterval; "
+ "end;" + "end;"
+ "return 0;", + "return nil;",
Arrays.<Object>asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), Arrays.<Object>asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName),
System.currentTimeMillis(), requestId); System.currentTimeMillis(), requestId);
future.onComplete((res, e) -> { future.onComplete((res, e) -> {
if (e != null || res) { if (e != null) {
scheduleRetryTimeRenewal(requestId); scheduleRetryTimeRenewal(requestId, 10000);
return;
}
if (res != null) {
scheduleRetryTimeRenewal(requestId, res);
} }
}); });
} }

Loading…
Cancel
Save