RScheduledExecutorService doesn't handle delay correctly.

tasksRetryIntervalName is deleted if tasks absent.
 #1605 #1617
pull/1624/head
Nikita 7 years ago
parent 9fce021131
commit 62aa3705b9

@ -22,7 +22,6 @@ import org.redisson.api.RFuture;
import org.redisson.api.RTopic;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.ChannelName;
import org.redisson.connection.ConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -107,7 +106,7 @@ public abstract class QueueTransferTask {
private void scheduleTask(final Long startTime) {
TimeoutTask oldTimeout = lastTimeout.get();
if (startTime == null || (oldTimeout != null && oldTimeout.getStartTime() < startTime)) {
if (startTime == null) {
return;
}

@ -430,7 +430,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
public RFuture<Boolean> deleteAsync() {
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
RFuture<Long> deleteFuture = redisson.getKeys().deleteAsync(
requestQueueName, statusName, tasksCounterName, schedulerQueueName, tasksName);
requestQueueName, statusName, tasksCounterName, schedulerQueueName, tasksName, tasksRetryIntervalName);
deleteFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {

@ -60,11 +60,11 @@ public class ScheduledTasksService extends TasksService {
"if redis.call('exists', KEYS[2]) == 0 then "
+ "local retryInterval = redis.call('get', KEYS[6]); "
+ "if retryInterval ~= false then "
+ "local time = tonumber(ARGV[4]) + tonumber(retryInterval);"
+ "local time = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "elseif tonumber(ARGV[5]) > 0 then "
+ "redis.call('set', KEYS[6], ARGV[5]);"
+ "local time = tonumber(ARGV[4]) + tonumber(ARGV[5]);"
+ "elseif tonumber(ARGV[4]) > 0 then "
+ "redis.call('set', KEYS[6], ARGV[4]);"
+ "local time = tonumber(ARGV[1]) + tonumber(ARGV[4]);"
+ "redis.call('zadd', KEYS[3], time, 'ff' .. ARGV[2]);"
+ "end; "
@ -81,7 +81,7 @@ public class ScheduledTasksService extends TasksService {
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName),
params.getStartTime(), request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval);
params.getStartTime(), request.getId(), encode(request), tasksRetryInterval);
}
@Override
@ -102,8 +102,9 @@ public class ScheduledTasksService extends TasksService {
// remove from executor queue
+ "if task ~= false and (removed > 0 or removedScheduled > 0) then "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3], KEYS[7]);"
+ "redis.call('del', KEYS[3]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('del', KEYS[7]);"
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
+ "end;"

@ -63,7 +63,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class TasksRunnerService implements RemoteExecutorService {
private final Map<HashValue, Codec> codecs = new LRUCacheMap<HashValue, Codec>(500, 0, 0);
private static final Map<HashValue, Codec> codecs = new LRUCacheMap<HashValue, Codec>(500, 0, 0);
private final Codec codec;
private final String name;
@ -323,10 +323,12 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "end;" +
"redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1], KEYS[6]);"
+ "redis.call('del', KEYS[1]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('del', KEYS[6]);"
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"

@ -167,8 +167,9 @@ public class TasksService extends BaseRemoteService {
// remove from executor queue
+ "if task ~= false and redis.call('exists', KEYS[3]) == 1 and redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "if redis.call('decr', KEYS[3]) == 0 then "
+ "redis.call('del', KEYS[3], KEYS[7]);"
+ "redis.call('del', KEYS[3]);"
+ "if redis.call('get', KEYS[4]) == ARGV[2] then "
+ "redis.call('del', KEYS[7]);"
+ "redis.call('set', KEYS[4], ARGV[3]);"
+ "redis.call('publish', KEYS[5], ARGV[3]);"
+ "end;"

@ -161,6 +161,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
Thread.sleep(16000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
executor.delete();
redisson.getKeys().delete("counter");
assertThat(redisson.getKeys().count()).isEqualTo(1);
}
@ -188,6 +189,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
executor.delete();
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@ -354,6 +356,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
s4.get();
assertThat(redisson.getAtomicLong("runnableCounter").get()).isEqualTo(100L);
redisson.getExecutorService("test").delete();
redisson.getKeys().delete("runnableCounter", "counter");
assertThat(redisson.getKeys().count()).isZero();
}

@ -52,6 +52,23 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
node.shutdown();
}
@Test
public void testDelay() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
long start = System.currentTimeMillis();
RScheduledFuture<?> f = executor.schedule(new ScheduledCallableTask(), 11, TimeUnit.SECONDS);
assertThat(f.awaitUninterruptibly(12000)).isTrue();
assertThat(System.currentTimeMillis() - start).isBetween(11000L, 11500L);
executor.scheduleWithFixedDelay(new IncrementRunnableTask("counter"), 0, 7, TimeUnit.SECONDS);
Thread.sleep(500);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(7000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(2);
Thread.sleep(7000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
}
@Test
public void testTaskFailover() throws Exception {
AtomicInteger counter = new AtomicInteger();
@ -71,8 +88,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
node.start();
RScheduledExecutorService executor = redisson.getExecutorService("test2", ExecutorOptions.defaults().taskRetryInterval(10, TimeUnit.SECONDS));
long start = System.currentTimeMillis();
RExecutorFuture<?> f = executor.schedule(new IncrementRunnableTask("counter"), 1, TimeUnit.SECONDS);
f.get();
f.syncUninterruptibly();
assertThat(System.currentTimeMillis() - start).isBetween(900L, 1200L);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(1);
Thread.sleep(2000);
node.shutdown();
@ -146,10 +165,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
@Test
public void testCronExpressionMultipleTasks() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(2, TimeUnit.SECONDS));
executor.schedule(new ScheduledRunnableTask("executed1"), CronSchedule.of("0/5 * * * * ?"));
executor.schedule(new ScheduledRunnableTask("executed2"), CronSchedule.of("0/1 * * * * ?"));
Thread.sleep(30200);
Thread.sleep(30000);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(6);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(30);
}
@ -163,6 +182,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(2000);
assertThat(redisson.getAtomicLong("executed1").isExists()).isFalse();
executor.delete();
redisson.getKeys().delete("executed1");
assertThat(redisson.getKeys().count()).isZero();
}
@ -174,7 +194,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
cancel(future1);
Thread.sleep(2000);
assertThat(redisson.getAtomicLong("executed1").isExists()).isFalse();
assertThat(executor.delete()).isFalse();
executor.delete();
redisson.getKeys().delete("executed1");
assertThat(redisson.getKeys().count()).isZero();
@ -193,6 +213,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
executor.delete();
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@ -216,6 +237,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(2);
executor.delete();
redisson.getKeys().delete("executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@ -244,6 +266,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
executor.delete();
redisson.getKeys().delete("counter", "executed1", "executed2");
assertThat(redisson.getKeys().count()).isZero();
}
@ -271,6 +294,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(5);
executor.delete();
redisson.getKeys().delete("executed1");
assertThat(redisson.getKeys().count()).isZero();
}
@ -293,6 +317,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(1);
assertThat(redisson.getAtomicLong("executed3").get()).isEqualTo(1);
executor.delete();
redisson.getKeys().delete("executed1", "executed2", "executed3");
assertThat(redisson.getKeys().count()).isZero();
}
@ -312,6 +337,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(1);
assertThat(redisson.getAtomicLong("executed3").get()).isEqualTo(1);
executor.delete();
redisson.getKeys().delete("executed1", "executed2", "executed3");
assertThat(redisson.getKeys().count()).isZero();
}
@ -325,6 +351,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(System.currentTimeMillis() - startTime).isBetween(5000L, 5200L);
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(1);
executor.delete();
redisson.getKeys().delete("executed");
assertThat(redisson.getKeys().count()).isZero();
}

Loading…
Cancel
Save