diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index c26175758..adebd6dae 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -251,7 +251,7 @@ public class TasksRunnerService implements RemoteExecutorService { + "local v = redis.call('zrange', KEYS[2], 0, 0); " // if new task added to queue head then publish its startTime // to all scheduler workers - + "if v[1] == ARGV[2] then " + + "if v[1] == scheduledName then " + "redis.call('publish', KEYS[3], startTime); " + "end;" + "return retryInterval; " diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 0fe6ad322..4382a63e4 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -4,18 +4,21 @@ import mockit.Invocation; import mockit.Mock; import mockit.MockUp; import org.awaitility.Awaitility; +import org.joor.Reflect; import org.junit.jupiter.api.*; import org.redisson.RedisDockerTest; import org.redisson.Redisson; import org.redisson.RedissonNode; +import org.redisson.RedissonTopic; import org.redisson.api.*; import org.redisson.api.annotation.RInject; import org.redisson.api.executor.TaskFinishedListener; import org.redisson.api.executor.TaskStartedListener; +import org.redisson.api.listener.MessageListener; +import org.redisson.client.codec.LongCodec; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; -import java.io.IOException; import java.io.Serializable; import java.time.Duration; import java.util.Arrays; @@ -670,4 +673,25 @@ public class RedissonExecutorServiceTest extends RedisDockerTest { }); } + @Test + public void testTaskDelay4TaskService() throws IllegalAccessException, NoSuchFieldException, InterruptedException { + RScheduledExecutorService test = redisson.getExecutorService("test"); + String topicName = Reflect.on(test).get("schedulerChannelName"); + RedissonTopic topic = RedissonTopic.createRaw(LongCodec.INSTANCE, ((Redisson) redisson).getCommandExecutor(), topicName); + + AtomicInteger counter = new AtomicInteger(); + + topic.addListener(Long.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Long msg) { + counter.incrementAndGet(); + } + }); + + test.submitAsync(new DelayedTask(10000, "test-counter")); + Thread.sleep(2000); + + assertThat(counter.get()).isGreaterThan(0); + } + }