diff --git a/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index f6f6cd69d..55f80f128 100644 --- a/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -1,10 +1,5 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.awaitility.Duration; @@ -16,13 +11,49 @@ import org.redisson.config.Config; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import java.io.IOException; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testListenersDuplication() throws InterruptedException { + Queue msg = new ConcurrentLinkedQueue<>(); + MessageListener aListener = (message, pattern) -> { + msg.add(message.getBody()); + }; + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + container.addMessageListener(aListener, + Arrays.asList(new ChannelTopic("a"), new ChannelTopic("b"))); + container.addMessageListener(aListener, + Arrays.asList(new PatternTopic("c*"))); + container.afterPropertiesSet(); + container.start(); + + Thread.sleep(200); + + RedisConnection c = factory.getConnection(); + c.publish("a".getBytes(), "msg".getBytes()); + + Awaitility.await().atMost(Duration.ONE_SECOND) + .untilAsserted(() -> { + assertThat(msg).containsExactly("msg".getBytes()); + }); + } + @Test public void testPatterTopic() throws IOException, InterruptedException { RedisRunner.RedisProcess instance = new RedisRunner() diff --git a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 59ee1a4ff..2f99ab108 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -53,6 +53,10 @@ public class RedissonSubscription extends AbstractSubscription { List> list = new ArrayList<>(); Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : channels) { + if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) { + continue; + } + CompletableFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() { @Override public void onMessage(CharSequence ch, Object message) { @@ -107,6 +111,10 @@ public class RedissonSubscription extends AbstractSubscription { List> list = new ArrayList<>(); Queue subscribed = new ConcurrentLinkedQueue<>(); for (byte[] channel : patterns) { + if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) { + continue; + } + CompletableFuture> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { @Override public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { diff --git a/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index f6f6cd69d..6cfdcc19d 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -1,11 +1,5 @@ package org.redisson.spring.data.connection; -import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Test; @@ -16,13 +10,47 @@ import org.redisson.config.Config; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import java.io.IOException; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testListenersDuplication() { + Queue msg = new ConcurrentLinkedQueue<>(); + MessageListener aListener = (message, pattern) -> { + msg.add(message.getBody()); + }; + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + container.addMessageListener(aListener, + Arrays.asList(new ChannelTopic("a"), new ChannelTopic("b"))); + container.addMessageListener(aListener, + Arrays.asList(new PatternTopic("c*"))); + container.afterPropertiesSet(); + container.start(); + + RedisConnection c = factory.getConnection(); + c.publish("a".getBytes(), "msg".getBytes()); + + Awaitility.await().atMost(Duration.ONE_SECOND) + .untilAsserted(() -> { + assertThat(msg).containsExactly("msg".getBytes()); + }); + } + @Test public void testPatterTopic() throws IOException, InterruptedException { RedisRunner.RedisProcess instance = new RedisRunner() @@ -59,7 +87,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest { }, new PatternTopic("__keyevent@0__:del")); container.afterPropertiesSet(); container.start(); - Assertions.assertThat(container.isRunning()).isTrue(); + assertThat(container.isRunning()).isTrue(); RedisConnection c = factory.getConnection(); c.set("mykey".getBytes(), "2".getBytes());