Fixed - ReliableTopic data loss. #4223

pull/5038/head
Nikita Koksharov 2 years ago
parent ebdbbd595f
commit ea2c8f66cb

@ -235,10 +235,8 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "local score = redis.call('hget', KEYS[3], v[1]); "
+ "local range = redis.call('xrange', KEYS[1], score, '+'); "
+ "if #range == 0 then "
+ "redis.call('del', KEYS[1]); "
+ "elseif #range == 1 and range[1][1] == score then "
+ "redis.call('del', KEYS[1]); "
+ "if #range == 0 or (#range == 1 and range[1][1] == score) then "
+ "redis.call('xtrim', KEYS[1], 'maxlen', 0); "
+ "else "
+ "redis.call('xtrim', KEYS[1], 'maxlen', #range); "
+ "end;"

@ -4,13 +4,13 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.redisson.api.RReliableTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.config.Config;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
@ -22,6 +22,41 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RedissonReliableTopicTest extends BaseTest {
@Test
public void testConcurrency() throws InterruptedException {
RReliableTopic rt = redisson.getReliableTopic("test1");
AtomicInteger sent = new AtomicInteger();
ExecutorService ee = Executors.newFixedThreadPool(8);
for (int i = 0; i < 500; i++) {
int j = i;
ee.submit(() -> {
rt.publish(j);
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sent.incrementAndGet();
});
}
AtomicInteger ii = new AtomicInteger();
rt.addListener(Integer.class, new MessageListener<Integer>() {
@Override
public void onMessage(CharSequence channel, Integer msg) {
ii.incrementAndGet();
}
});
ee.shutdown();
assertThat(ee.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
assertThat(sent.get()).isEqualTo(500);
assertThat(ii.get()).isEqualTo(500);
rt.removeAllListeners();
}
@Test
public void testRemoveExpiredSubscribers() throws InterruptedException {
RReliableTopic rt = redisson.getReliableTopic("test1");
@ -73,7 +108,7 @@ public class RedissonReliableTopicTest extends BaseTest {
assertThat(rt.publish(i)).isEqualTo(2);
}
Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> counter.get() == 20);
Awaitility.waitAtMost(Duration.ofSeconds(2)).until(() -> counter.get() == 20);
assertThat(rt.size()).isEqualTo(0);
}

Loading…
Cancel
Save