Fixed - duplicate subscriptions with RedisMessageListenerContainer in Spring Data 2.7 #4415

pull/4452/head
Nikita Koksharov 3 years ago
parent d771052f32
commit bc22133834

@ -1,10 +1,5 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
@ -16,13 +11,49 @@ import org.redisson.config.Config;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testListenersDuplication() throws InterruptedException {
Queue<byte[]> msg = new ConcurrentLinkedQueue<>();
MessageListener aListener = (message, pattern) -> {
msg.add(message.getBody());
};
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(aListener,
Arrays.asList(new ChannelTopic("a"), new ChannelTopic("b")));
container.addMessageListener(aListener,
Arrays.asList(new PatternTopic("c*")));
container.afterPropertiesSet();
container.start();
Thread.sleep(200);
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
}
@Test
public void testPatterTopic() throws IOException, InterruptedException {
RedisRunner.RedisProcess instance = new RedisRunner()

@ -53,6 +53,10 @@ public class RedissonSubscription extends AbstractSubscription {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
for (byte[] channel : channels) {
if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) {
continue;
}
CompletableFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
public void onMessage(CharSequence ch, Object message) {
@ -107,6 +111,10 @@ public class RedissonSubscription extends AbstractSubscription {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
for (byte[] channel : patterns) {
if (subscribeService.getPubSubEntry(new ChannelName(channel)) != null) {
continue;
}
CompletableFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {

@ -1,11 +1,5 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
@ -16,13 +10,47 @@ import org.redisson.config.Config;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testListenersDuplication() {
Queue<byte[]> msg = new ConcurrentLinkedQueue<>();
MessageListener aListener = (message, pattern) -> {
msg.add(message.getBody());
};
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(aListener,
Arrays.asList(new ChannelTopic("a"), new ChannelTopic("b")));
container.addMessageListener(aListener,
Arrays.asList(new PatternTopic("c*")));
container.afterPropertiesSet();
container.start();
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
}
@Test
public void testPatterTopic() throws IOException, InterruptedException {
RedisRunner.RedisProcess instance = new RedisRunner()
@ -59,7 +87,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, new PatternTopic("__keyevent@0__:del"));
container.afterPropertiesSet();
container.start();
Assertions.assertThat(container.isRunning()).isTrue();
assertThat(container.isRunning()).isTrue();
RedisConnection c = factory.getConnection();
c.set("mykey".getBytes(), "2".getBytes());

Loading…
Cancel
Save