Fixed - Redis RemoteExecutorService "expiration" sorted set growing undefinitely

Signed-off-by: seakider <seakider@gmail.com>
pull/6442/head
seakider 1 week ago
parent 042b660185
commit 06fd0a9e44

@ -392,6 +392,7 @@ public class TasksRunnerService implements RemoteExecutorService {
script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "redis.call('zrem', KEYS[7], ARGV[3]); "
+ "end;";
}
script += "redis.call('zrem', KEYS[5], 'ff:' .. ARGV[3]);" +
@ -406,7 +407,7 @@ public class TasksRunnerService implements RemoteExecutorService {
RFuture<Object> f = commandExecutor.evalWriteNoRetryAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
script,
Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
Arrays.asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName, tasksExpirationTimeName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);
commandExecutor.get(f);
}

@ -611,6 +611,19 @@ public class RedissonExecutorServiceTest extends RedisDockerTest {
assertThat(redisson.getKeys().countExists("testparam")).isEqualTo(0);
}
@Test
public void testExpiration() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
Future<?> future = executor.submit(new ScheduledRunnableTask("testparam"), 10, TimeUnit.SECONDS);
future.get();
assertThat(redisson.getKeys().countExists("testparam")).isEqualTo(1);
String tasksExpirationTimeName = Reflect.on(executor).get("tasksExpirationTimeName");
RScoredSortedSet<String> set = redisson.getScoredSortedSet(tasksExpirationTimeName);
assertThat(set.size()).isEqualTo(0);
}
@Test
public void testAnonymousRunnable() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {

Loading…
Cancel
Save