Fixed - RReliableTopic doesn't remove all expired subscribers at once. #4765

pull/4773/head
Nikita Koksharov 2 years ago
parent ac93cbc48f
commit 69b71104cf

@ -225,11 +225,11 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "redis.call('hset', KEYS[3], ARGV[2], ARGV[1]); "
+ "end; "
+ "local t = redis.call('zrange', KEYS[5], 0, 0, 'WITHSCORES'); "
+ "if #t == 2 and tonumber(t[2]) < tonumber(ARGV[3]) then "
+ "redis.call('hdel', KEYS[3], t[1]); "
+ "redis.call('zrem', KEYS[2], t[1]); "
+ "redis.call('zrem', KEYS[5], t[1]); "
+ "local expired = redis.call('zrangebyscore', KEYS[5], 0, tonumber(ARGV[3]) - 1); "
+ "for i, v in ipairs(expired) do "
+ "redis.call('hdel', KEYS[3], v); "
+ "redis.call('zrem', KEYS[2], v); "
+ "redis.call('zrem', KEYS[5], v); "
+ "end; "
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "

@ -3,6 +3,8 @@ package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.redisson.api.RReliableTopic;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.time.Duration;
import java.util.LinkedList;
@ -20,6 +22,41 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RedissonReliableTopicTest extends BaseTest {
@Test
public void testRemoveExpiredSubscribers() throws InterruptedException {
RReliableTopic rt = redisson.getReliableTopic("test1");
AtomicInteger counter = new AtomicInteger();
rt.addListener(Integer.class, (ch, m) -> {
counter.incrementAndGet();
});
Config config = new Config();
config.setReliableTopicWatchdogTimeout(1000);
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient secondInstance = Redisson.create(config);
RReliableTopic rt2 = secondInstance.getReliableTopic("test1");
rt2.addListener(Integer.class, (ch, m) -> {
counter.incrementAndGet();
});
assertThat(rt2.countSubscribers()).isEqualTo(2);
secondInstance.shutdown();
Thread.sleep(1500);
for (int i = 0; i < 10; i++) {
rt.publish(i);
}
Thread.sleep(100);
assertThat(rt.countSubscribers()).isEqualTo(1);
assertThat(counter.get()).isEqualTo(10);
assertThat(rt.size()).isEqualTo(0);
}
@Test
public void testAutoTrim() {
RReliableTopic rt = redisson.getReliableTopic("test1");

Loading…
Cancel
Save