Fixed - task scheduled with cron pattern isn't executed with single worker. #1734

pull/1792/head
Nikita Koksharov 6 years ago
parent 02f1fd7e20
commit db1ba0c928

@ -146,7 +146,7 @@ public class TasksRunnerService implements RemoteExecutorService {
future = service.schedule(params);
}
try {
executeRunnable(params);
executeRunnable(params, nextStartDate);
} catch (RuntimeException e) {
// cancel task if it throws an exception
if (future != null) {
@ -209,7 +209,7 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
finish(params.getRequestId());
finish(params.getRequestId(), null);
}
}
@ -293,8 +293,7 @@ public class TasksRunnerService implements RemoteExecutorService {
}
}
@Override
public void executeRunnable(TaskParameters params) {
public void executeRunnable(TaskParameters params, Date nextDate) {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
renewRetryTime(params.getRequestId());
}
@ -309,9 +308,14 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
finish(params.getRequestId());
finish(params.getRequestId(), nextDate);
}
}
@Override
public void executeRunnable(TaskParameters params) {
executeRunnable(params, null);
}
/**
* Check shutdown state. If tasksCounter equals <code>0</code>
@ -323,22 +327,26 @@ public class TasksRunnerService implements RemoteExecutorService {
*
* @param requestId
*/
private void finish(String requestId) {
private void finish(String requestId, Date nextDate) {
String script = "";
if (nextDate == null) {
script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "end;";
}
script += "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('del', KEYS[6]);"
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;";
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "end;" +
"redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('del', KEYS[6]);"
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;",
script,
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);
}

@ -17,6 +17,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.Redisson;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonNode;
import org.redisson.api.CronSchedule;
@ -24,7 +25,9 @@ import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
@ -55,6 +58,43 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
node.shutdown();
}
public static class TestTask implements Runnable {
@RInject
RedissonClient redisson;
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisson.getAtomicLong("counter").incrementAndGet();
}
}
@Test
public void testSingleWorker() throws InterruptedException {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.getExecutorServiceWorkers().put("JobA", 1);
RedissonNode node = RedissonNode.create(nodeConfig);
node.start();
RedissonClient client = Redisson.create(config);
RScheduledExecutorService executorService = client.getExecutorService("JobA");
executorService.schedule(new TestTask() , CronSchedule.of("0/1 * * * * ?"));
TimeUnit.MILLISECONDS.sleep(4800);
assertThat(client.getAtomicLong("counter").get()).isEqualTo(4);
client.shutdown();
node.shutdown();
}
@Test
public void testDelay() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
@ -106,6 +146,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();

Loading…
Cancel
Save