diff --git a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java index 52bfdd0f1..dd6c30703 100644 --- a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java @@ -235,10 +235,8 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "local score = redis.call('hget', KEYS[3], v[1]); " + "local range = redis.call('xrange', KEYS[1], score, '+'); " - + "if #range == 0 then " - + "redis.call('del', KEYS[1]); " - + "elseif #range == 1 and range[1][1] == score then " - + "redis.call('del', KEYS[1]); " + + "if #range == 0 or (#range == 1 and range[1][1] == score) then " + + "redis.call('xtrim', KEYS[1], 'maxlen', 0); " + "else " + "redis.call('xtrim', KEYS[1], 'maxlen', #range); " + "end;" diff --git a/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java b/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java index 23e3840ca..815b77c37 100644 --- a/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReliableTopicTest.java @@ -4,13 +4,13 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.redisson.api.RReliableTopic; import org.redisson.api.RedissonClient; +import org.redisson.api.listener.MessageListener; import org.redisson.config.Config; import java.time.Duration; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -22,6 +22,41 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class RedissonReliableTopicTest extends BaseTest { + @Test + public void testConcurrency() throws InterruptedException { + RReliableTopic rt = redisson.getReliableTopic("test1"); + + AtomicInteger sent = new AtomicInteger(); + ExecutorService ee = Executors.newFixedThreadPool(8); + for (int i = 0; i < 500; i++) { + int j = i; + ee.submit(() -> { + rt.publish(j); + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + sent.incrementAndGet(); + }); + } + + AtomicInteger ii = new AtomicInteger(); + rt.addListener(Integer.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Integer msg) { + ii.incrementAndGet(); + } + }); + + + ee.shutdown(); + assertThat(ee.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + assertThat(sent.get()).isEqualTo(500); + assertThat(ii.get()).isEqualTo(500); + rt.removeAllListeners(); + } + @Test public void testRemoveExpiredSubscribers() throws InterruptedException { RReliableTopic rt = redisson.getReliableTopic("test1"); @@ -73,7 +108,7 @@ public class RedissonReliableTopicTest extends BaseTest { assertThat(rt.publish(i)).isEqualTo(2); } - Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> counter.get() == 20); + Awaitility.waitAtMost(Duration.ofSeconds(2)).until(() -> counter.get() == 20); assertThat(rt.size()).isEqualTo(0); }