From c3cdb4f5a8d1988c577c113fd7230c5d4d4c18ff Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 31 May 2018 15:55:44 +0300 Subject: [PATCH] ExecutorService task failover implemented. #1291, #1120 --- .../java/org/redisson/RedissonExecutorService.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 6d43a5471..a5d0d66eb 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -92,7 +92,7 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonExecutorService implements RScheduledExecutorService { - private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class); private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS); @@ -250,6 +250,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" + "if retryInterval ~= false then " + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "for i = 1, #expiredTaskIds, 1 do " + "local name = expiredTaskIds[i];" + "local scheduledName = expiredTaskIds[i];" @@ -266,7 +267,12 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "if v[1] == expiredTaskIds[i] then " + "redis.call('publish', KEYS[3], startTime); " + "end;" - + "redis.call('rpush', KEYS[1], name);" + + + "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then " + + "redis.call('rpush', KEYS[1], name); " + + "else " + + "redis.call('lrem', KEYS[1], -1, name); " + + "end; " + "end; " + "else " + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" @@ -279,7 +285,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "end " + "return nil;", Arrays.asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), - System.currentTimeMillis(), 100); + System.currentTimeMillis(), 50); } }; queueTransferService.schedule(getName(), task); @@ -303,8 +309,6 @@ public class RedissonExecutorService implements RScheduledExecutorService { } }); } - - private long repeatInterval = 5000; @Override public void execute(Runnable task) {