From 56544c6977070dd97ca4c068b89037af91e222c3 Mon Sep 17 00:00:00 2001 From: seakider Date: Thu, 26 Dec 2024 21:08:09 +0800 Subject: [PATCH 1/2] Fixed - Task not firing on the next executor if app was killed #5577 Signed-off-by: xuxiaolei --- .../redisson/executor/TasksRunnerService.java | 2 +- .../executor/RedissonExecutorServiceTest.java | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index c26175758..adebd6dae 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -251,7 +251,7 @@ public class TasksRunnerService implements RemoteExecutorService { + "local v = redis.call('zrange', KEYS[2], 0, 0); " // if new task added to queue head then publish its startTime // to all scheduler workers - + "if v[1] == ARGV[2] then " + + "if v[1] == scheduledName then " + "redis.call('publish', KEYS[3], startTime); " + "end;" + "return retryInterval; " diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 0fe6ad322..5a443611d 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -5,18 +5,19 @@ import mockit.Mock; import mockit.MockUp; import org.awaitility.Awaitility; import org.junit.jupiter.api.*; -import org.redisson.RedisDockerTest; -import org.redisson.Redisson; -import org.redisson.RedissonNode; +import org.redisson.*; import org.redisson.api.*; import org.redisson.api.annotation.RInject; import org.redisson.api.executor.TaskFinishedListener; import org.redisson.api.executor.TaskStartedListener; +import org.redisson.api.listener.MessageListener; +import org.redisson.client.codec.LongCodec; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -670,4 +671,27 @@ public class RedissonExecutorServiceTest extends RedisDockerTest { }); } + @Test + public void testTaskDelay4TaskService() throws IllegalAccessException, NoSuchFieldException, InterruptedException { + RScheduledExecutorService test = redisson.getExecutorService("test"); + Field field = RedissonExecutorService.class.getDeclaredField("schedulerChannelName"); + field.setAccessible(true); + String topicName = (String) field.get(test); + RedissonTopic topic = RedissonTopic.createRaw(LongCodec.INSTANCE, ((Redisson) redisson).getCommandExecutor(), topicName); + + AtomicInteger counter = new AtomicInteger(); + + topic.addListener(Long.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Long msg) { + counter.incrementAndGet(); + } + }); + + test.submitAsync(new DelayedTask(10000, "test-counter")); + Thread.sleep(2000); + + assertThat(counter.get()).isGreaterThan(0); + } + } From a115b66f28eee849491f397e6275e20b32e05808 Mon Sep 17 00:00:00 2001 From: seakider Date: Fri, 27 Dec 2024 10:12:04 +0800 Subject: [PATCH 2/2] Fixed - Task not firing on the next executor if app was killed #5577 Signed-off-by: xuxiaolei --- .../executor/RedissonExecutorServiceTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 5a443611d..4382a63e4 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -4,8 +4,12 @@ import mockit.Invocation; import mockit.Mock; import mockit.MockUp; import org.awaitility.Awaitility; +import org.joor.Reflect; import org.junit.jupiter.api.*; -import org.redisson.*; +import org.redisson.RedisDockerTest; +import org.redisson.Redisson; +import org.redisson.RedissonNode; +import org.redisson.RedissonTopic; import org.redisson.api.*; import org.redisson.api.annotation.RInject; import org.redisson.api.executor.TaskFinishedListener; @@ -15,9 +19,7 @@ import org.redisson.client.codec.LongCodec; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; -import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -674,9 +676,7 @@ public class RedissonExecutorServiceTest extends RedisDockerTest { @Test public void testTaskDelay4TaskService() throws IllegalAccessException, NoSuchFieldException, InterruptedException { RScheduledExecutorService test = redisson.getExecutorService("test"); - Field field = RedissonExecutorService.class.getDeclaredField("schedulerChannelName"); - field.setAccessible(true); - String topicName = (String) field.get(test); + String topicName = Reflect.on(test).get("schedulerChannelName"); RedissonTopic topic = RedissonTopic.createRaw(LongCodec.INSTANCE, ((Redisson) redisson).getCommandExecutor(), topicName); AtomicInteger counter = new AtomicInteger();