ExecutorService task failover implemented. #1291, #1120

pull/1499/head
Nikita 7 years ago
parent 31d5140b76
commit c3cdb4f5a8

@ -92,7 +92,7 @@ import io.netty.util.internal.PlatformDependent;
*/ */
public class RedissonExecutorService implements RScheduledExecutorService { public class RedissonExecutorService implements RScheduledExecutorService {
private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class);
private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS); private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
@ -250,6 +250,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "if retryInterval ~= false then " + "if retryInterval ~= false then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "for i = 1, #expiredTaskIds, 1 do " + "for i = 1, #expiredTaskIds, 1 do "
+ "local name = expiredTaskIds[i];" + "local name = expiredTaskIds[i];"
+ "local scheduledName = expiredTaskIds[i];" + "local scheduledName = expiredTaskIds[i];"
@ -266,7 +267,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "if v[1] == expiredTaskIds[i] then " + "if v[1] == expiredTaskIds[i] then "
+ "redis.call('publish', KEYS[3], startTime); " + "redis.call('publish', KEYS[3], startTime); "
+ "end;" + "end;"
+ "redis.call('rpush', KEYS[1], name);"
+ "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then "
+ "redis.call('rpush', KEYS[1], name); "
+ "else "
+ "redis.call('lrem', KEYS[1], -1, name); "
+ "end; "
+ "end; " + "end; "
+ "else " + "else "
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
@ -279,7 +285,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "end " + "end "
+ "return nil;", + "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName),
System.currentTimeMillis(), 100); System.currentTimeMillis(), 50);
} }
}; };
queueTransferService.schedule(getName(), task); queueTransferService.schedule(getName(), task);
@ -304,8 +310,6 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}); });
} }
private long repeatInterval = 5000;
@Override @Override
public void execute(Runnable task) { public void execute(Runnable task) {
check(task); check(task);

Loading…
Cancel
Save