Fixed - Sprint Data Redis RedisMessageListenerContainer.addMessageListener() method hangs if called after container start. #5482

pull/5520/head
Nikita Koksharov 1 year ago
parent 27a4420590
commit 59fd8fcba0

@ -26,6 +26,64 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testContainer() {
RedissonConnectionFactory f = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, ChannelTopic.of("test"));
// }
//
// container.stop();
//
// container = new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, PatternTopic.of("*"));
// }
// container.stop();
//
// container= new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, ChannelTopic.of("test"+i));
}
container.stop();
container= new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, PatternTopic.of("*" + i));
}
container.stop();
}
@Test
public void testListenersDuplication() throws InterruptedException {
Queue<byte[]> msg = new ConcurrentLinkedQueue<>();

@ -28,12 +28,15 @@ import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
*
*
* @author Nikita Koksharov
*
*/
@ -41,7 +44,7 @@ public class RedissonSubscription extends AbstractSubscription {
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.commandExecutor = commandExecutor;
@ -52,6 +55,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
for (byte[] channel : channels) {
if (subscribeService.hasEntry(new ChannelName(channel))) {
continue;
@ -79,6 +83,10 @@ public class RedissonSubscription extends AbstractSubscription {
subscribed.add(channel);
}
super.onStatus(type, ch);
if (type == PubSubType.UNSUBSCRIBE) {
latch.countDown();
}
}
});
@ -90,6 +98,22 @@ public class RedissonSubscription extends AbstractSubscription {
for (byte[] channel : subscribed) {
((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1);
}
// hack for RedisMessageListenerContainer
if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) {
StringWriter sw = new StringWriter();
new Exception().printStackTrace(new PrintWriter(sw));
String[] r = sw.toString().split("\n");
if (r.length != 7) {
return;
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
@ -110,6 +134,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doPsubscribe(byte[]... patterns) {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
for (byte[] channel : patterns) {
if (subscribeService.hasEntry(new ChannelName(channel))) {
continue;
@ -137,6 +162,9 @@ public class RedissonSubscription extends AbstractSubscription {
subscribed.add(channel);
}
super.onStatus(type, pattern);
if (type == PubSubType.PUNSUBSCRIBE) {
latch.countDown();
}
}
});
list.add(f);
@ -147,6 +175,22 @@ public class RedissonSubscription extends AbstractSubscription {
for (byte[] channel : subscribed) {
((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1);
}
// hack for RedisMessageListenerContainer
if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) {
StringWriter sw = new StringWriter();
new Exception().printStackTrace(new PrintWriter(sw));
String[] r = sw.toString().split("\n");
if (r.length != 7) {
return;
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private byte[] toBytes(Object message) {

@ -27,6 +27,64 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testContainer() {
RedissonConnectionFactory f = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, ChannelTopic.of("test"));
// }
//
// container.stop();
//
// container = new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, PatternTopic.of("*"));
// }
// container.stop();
//
// container= new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, ChannelTopic.of("test"+i));
}
container.stop();
container= new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, PatternTopic.of("*" + i));
}
container.stop();
}
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents(

@ -28,12 +28,15 @@ import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
*
*
* @author Nikita Koksharov
*
*/
@ -41,7 +44,7 @@ public class RedissonSubscription extends AbstractSubscription {
private final CommandAsyncExecutor commandExecutor;
private final PublishSubscribeService subscribeService;
public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) {
super(listener, null, null);
this.commandExecutor = commandExecutor;
@ -52,6 +55,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
for (byte[] channel : channels) {
if (subscribeService.hasEntry(new ChannelName(channel))) {
continue;
@ -79,6 +83,10 @@ public class RedissonSubscription extends AbstractSubscription {
subscribed.add(channel);
}
super.onStatus(type, ch);
if (type == PubSubType.UNSUBSCRIBE) {
latch.countDown();
}
}
});
@ -90,6 +98,22 @@ public class RedissonSubscription extends AbstractSubscription {
for (byte[] channel : subscribed) {
((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1);
}
// hack for RedisMessageListenerContainer
if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) {
StringWriter sw = new StringWriter();
new Exception().printStackTrace(new PrintWriter(sw));
String[] r = sw.toString().split("\n");
if (r.length != 7) {
return;
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
@ -110,6 +134,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doPsubscribe(byte[]... patterns) {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
for (byte[] channel : patterns) {
if (subscribeService.hasEntry(new ChannelName(channel))) {
continue;
@ -137,6 +162,9 @@ public class RedissonSubscription extends AbstractSubscription {
subscribed.add(channel);
}
super.onStatus(type, pattern);
if (type == PubSubType.PUNSUBSCRIBE) {
latch.countDown();
}
}
});
list.add(f);
@ -147,6 +175,22 @@ public class RedissonSubscription extends AbstractSubscription {
for (byte[] channel : subscribed) {
((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1);
}
// hack for RedisMessageListenerContainer
if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) {
StringWriter sw = new StringWriter();
new Exception().printStackTrace(new PrintWriter(sw));
String[] r = sw.toString().split("\n");
if (r.length != 7) {
return;
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private byte[] toBytes(Object message) {

@ -28,9 +28,12 @@ import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
*
@ -52,6 +55,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doSubscribe(byte[]... channels) {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
for (byte[] channel : channels) {
if (subscribeService.hasEntry(new ChannelName(channel))) {
continue;
@ -79,6 +83,10 @@ public class RedissonSubscription extends AbstractSubscription {
subscribed.add(channel);
}
super.onStatus(type, ch);
if (type == PubSubType.UNSUBSCRIBE) {
latch.countDown();
}
}
});
@ -90,6 +98,22 @@ public class RedissonSubscription extends AbstractSubscription {
for (byte[] channel : subscribed) {
((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1);
}
// hack for RedisMessageListenerContainer
if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) {
StringWriter sw = new StringWriter();
new Exception().printStackTrace(new PrintWriter(sw));
String[] r = sw.toString().split("\n");
if (r.length != 7) {
return;
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
@ -110,6 +134,7 @@ public class RedissonSubscription extends AbstractSubscription {
protected void doPsubscribe(byte[]... patterns) {
List<CompletableFuture<?>> list = new ArrayList<>();
Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>();
CountDownLatch latch = new CountDownLatch(1);
for (byte[] channel : patterns) {
if (subscribeService.hasEntry(new ChannelName(channel))) {
continue;
@ -137,6 +162,9 @@ public class RedissonSubscription extends AbstractSubscription {
subscribed.add(channel);
}
super.onStatus(type, pattern);
if (type == PubSubType.PUNSUBSCRIBE) {
latch.countDown();
}
}
});
list.add(f);
@ -147,6 +175,22 @@ public class RedissonSubscription extends AbstractSubscription {
for (byte[] channel : subscribed) {
((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1);
}
// hack for RedisMessageListenerContainer
if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) {
StringWriter sw = new StringWriter();
new Exception().printStackTrace(new PrintWriter(sw));
String[] r = sw.toString().split("\n");
if (r.length != 7) {
return;
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private byte[] toBytes(Object message) {

@ -27,6 +27,64 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testContainer() {
RedissonConnectionFactory f = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, ChannelTopic.of("test"));
// }
//
// container.stop();
//
// container = new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
// for (int i = 0; i < 2; i++) {
// container.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// }
// }, PatternTopic.of("*"));
// }
// container.stop();
//
// container= new RedisMessageListenerContainer();
// container.setConnectionFactory(f);
// container.afterPropertiesSet();
// container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, ChannelTopic.of("test"+i));
}
container.stop();
container= new RedisMessageListenerContainer();
container.setConnectionFactory(f);
container.afterPropertiesSet();
container.start();
for (int i = 0; i < 2; i++) {
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
}
}, PatternTopic.of("*" + i));
}
container.stop();
}
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents(

Loading…
Cancel
Save