Merge pull request #6360 from seakider/fix_retry

Fixed - Task not firing on the next executor if app was killed #5577
pull/6367/head
Nikita Koksharov 1 month ago committed by GitHub
commit a905e7a8ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -251,7 +251,7 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "if v[1] == scheduledName then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"
+ "return retryInterval; "

@ -4,18 +4,21 @@ import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.awaitility.Awaitility;
import org.joor.Reflect;
import org.junit.jupiter.api.*;
import org.redisson.RedisDockerTest;
import org.redisson.Redisson;
import org.redisson.RedissonNode;
import org.redisson.RedissonTopic;
import org.redisson.api.*;
import org.redisson.api.annotation.RInject;
import org.redisson.api.executor.TaskFinishedListener;
import org.redisson.api.executor.TaskStartedListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
@ -670,4 +673,25 @@ public class RedissonExecutorServiceTest extends RedisDockerTest {
});
}
@Test
public void testTaskDelay4TaskService() throws IllegalAccessException, NoSuchFieldException, InterruptedException {
RScheduledExecutorService test = redisson.getExecutorService("test");
String topicName = Reflect.on(test).get("schedulerChannelName");
RedissonTopic topic = RedissonTopic.createRaw(LongCodec.INSTANCE, ((Redisson) redisson).getCommandExecutor(), topicName);
AtomicInteger counter = new AtomicInteger();
topic.addListener(Long.class, new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long msg) {
counter.incrementAndGet();
}
});
test.submitAsync(new DelayedTask(10000, "test-counter"));
Thread.sleep(2000);
assertThat(counter.get()).isGreaterThan(0);
}
}

Loading…
Cancel
Save