diff --git a/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index 55f80f128..f9cf940c4 100644 --- a/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-26/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -26,6 +26,64 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testContainer() { + RedissonConnectionFactory f = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, ChannelTopic.of("test")); +// } +// +// container.stop(); +// +// container = new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, PatternTopic.of("*")); +// } +// container.stop(); +// +// container= new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, ChannelTopic.of("test"+i)); + } + container.stop(); + + container= new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, PatternTopic.of("*" + i)); + } + container.stop(); + } + @Test public void testListenersDuplication() throws InterruptedException { Queue<byte[]> msg = new ConcurrentLinkedQueue<>(); diff --git a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 956a9dcaf..223b20b63 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -28,12 +28,15 @@ import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.AbstractSubscription; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; /** - * + * * @author Nikita Koksharov * */ @@ -41,7 +44,7 @@ public class RedissonSubscription extends AbstractSubscription { private final CommandAsyncExecutor commandExecutor; private final PublishSubscribeService subscribeService; - + public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) { super(listener, null, null); this.commandExecutor = commandExecutor; @@ -52,6 +55,7 @@ public class RedissonSubscription extends AbstractSubscription { protected void doSubscribe(byte[]... channels) { List<CompletableFuture<?>> list = new ArrayList<>(); Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>(); + CountDownLatch latch = new CountDownLatch(1); for (byte[] channel : channels) { if (subscribeService.hasEntry(new ChannelName(channel))) { continue; @@ -79,6 +83,10 @@ public class RedissonSubscription extends AbstractSubscription { subscribed.add(channel); } super.onStatus(type, ch); + + if (type == PubSubType.UNSUBSCRIBE) { + latch.countDown(); + } } }); @@ -90,6 +98,22 @@ public class RedissonSubscription extends AbstractSubscription { for (byte[] channel : subscribed) { ((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1); } + + // hack for RedisMessageListenerContainer + if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) { + StringWriter sw = new StringWriter(); + new Exception().printStackTrace(new PrintWriter(sw)); + String[] r = sw.toString().split("\n"); + if (r.length != 7) { + return; + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @Override @@ -110,6 +134,7 @@ public class RedissonSubscription extends AbstractSubscription { protected void doPsubscribe(byte[]... patterns) { List<CompletableFuture<?>> list = new ArrayList<>(); Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>(); + CountDownLatch latch = new CountDownLatch(1); for (byte[] channel : patterns) { if (subscribeService.hasEntry(new ChannelName(channel))) { continue; @@ -137,6 +162,9 @@ public class RedissonSubscription extends AbstractSubscription { subscribed.add(channel); } super.onStatus(type, pattern); + if (type == PubSubType.PUNSUBSCRIBE) { + latch.countDown(); + } } }); list.add(f); @@ -147,6 +175,22 @@ public class RedissonSubscription extends AbstractSubscription { for (byte[] channel : subscribed) { ((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1); } + + // hack for RedisMessageListenerContainer + if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) { + StringWriter sw = new StringWriter(); + new Exception().printStackTrace(new PrintWriter(sw)); + String[] r = sw.toString().split("\n"); + if (r.length != 7) { + return; + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } private byte[] toBytes(Object message) { diff --git a/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index f681bb714..d770e64bb 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-27/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -27,6 +27,64 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testContainer() { + RedissonConnectionFactory f = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, ChannelTopic.of("test")); +// } +// +// container.stop(); +// +// container = new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, PatternTopic.of("*")); +// } +// container.stop(); +// +// container= new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, ChannelTopic.of("test"+i)); + } + container.stop(); + + container= new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, PatternTopic.of("*" + i)); + } + container.stop(); + } + @Test public void testCluster() throws IOException, InterruptedException { RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents( diff --git a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 956a9dcaf..223b20b63 100644 --- a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -28,12 +28,15 @@ import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.AbstractSubscription; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; /** - * + * * @author Nikita Koksharov * */ @@ -41,7 +44,7 @@ public class RedissonSubscription extends AbstractSubscription { private final CommandAsyncExecutor commandExecutor; private final PublishSubscribeService subscribeService; - + public RedissonSubscription(CommandAsyncExecutor commandExecutor, PublishSubscribeService subscribeService, MessageListener listener) { super(listener, null, null); this.commandExecutor = commandExecutor; @@ -52,6 +55,7 @@ public class RedissonSubscription extends AbstractSubscription { protected void doSubscribe(byte[]... channels) { List<CompletableFuture<?>> list = new ArrayList<>(); Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>(); + CountDownLatch latch = new CountDownLatch(1); for (byte[] channel : channels) { if (subscribeService.hasEntry(new ChannelName(channel))) { continue; @@ -79,6 +83,10 @@ public class RedissonSubscription extends AbstractSubscription { subscribed.add(channel); } super.onStatus(type, ch); + + if (type == PubSubType.UNSUBSCRIBE) { + latch.countDown(); + } } }); @@ -90,6 +98,22 @@ public class RedissonSubscription extends AbstractSubscription { for (byte[] channel : subscribed) { ((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1); } + + // hack for RedisMessageListenerContainer + if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) { + StringWriter sw = new StringWriter(); + new Exception().printStackTrace(new PrintWriter(sw)); + String[] r = sw.toString().split("\n"); + if (r.length != 7) { + return; + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @Override @@ -110,6 +134,7 @@ public class RedissonSubscription extends AbstractSubscription { protected void doPsubscribe(byte[]... patterns) { List<CompletableFuture<?>> list = new ArrayList<>(); Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>(); + CountDownLatch latch = new CountDownLatch(1); for (byte[] channel : patterns) { if (subscribeService.hasEntry(new ChannelName(channel))) { continue; @@ -137,6 +162,9 @@ public class RedissonSubscription extends AbstractSubscription { subscribed.add(channel); } super.onStatus(type, pattern); + if (type == PubSubType.PUNSUBSCRIBE) { + latch.countDown(); + } } }); list.add(f); @@ -147,6 +175,22 @@ public class RedissonSubscription extends AbstractSubscription { for (byte[] channel : subscribed) { ((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1); } + + // hack for RedisMessageListenerContainer + if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) { + StringWriter sw = new StringWriter(); + new Exception().printStackTrace(new PrintWriter(sw)); + String[] r = sw.toString().split("\n"); + if (r.length != 7) { + return; + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } private byte[] toBytes(Object message) { diff --git a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 956a9dcaf..f7707213a 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -28,9 +28,12 @@ import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.AbstractSubscription; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; /** * @@ -52,6 +55,7 @@ public class RedissonSubscription extends AbstractSubscription { protected void doSubscribe(byte[]... channels) { List<CompletableFuture<?>> list = new ArrayList<>(); Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>(); + CountDownLatch latch = new CountDownLatch(1); for (byte[] channel : channels) { if (subscribeService.hasEntry(new ChannelName(channel))) { continue; @@ -79,6 +83,10 @@ public class RedissonSubscription extends AbstractSubscription { subscribed.add(channel); } super.onStatus(type, ch); + + if (type == PubSubType.UNSUBSCRIBE) { + latch.countDown(); + } } }); @@ -90,6 +98,22 @@ public class RedissonSubscription extends AbstractSubscription { for (byte[] channel : subscribed) { ((SubscriptionListener) getListener()).onChannelSubscribed(channel, 1); } + + // hack for RedisMessageListenerContainer + if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) { + StringWriter sw = new StringWriter(); + new Exception().printStackTrace(new PrintWriter(sw)); + String[] r = sw.toString().split("\n"); + if (r.length != 7) { + return; + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @Override @@ -110,6 +134,7 @@ public class RedissonSubscription extends AbstractSubscription { protected void doPsubscribe(byte[]... patterns) { List<CompletableFuture<?>> list = new ArrayList<>(); Queue<byte[]> subscribed = new ConcurrentLinkedQueue<>(); + CountDownLatch latch = new CountDownLatch(1); for (byte[] channel : patterns) { if (subscribeService.hasEntry(new ChannelName(channel))) { continue; @@ -137,6 +162,9 @@ public class RedissonSubscription extends AbstractSubscription { subscribed.add(channel); } super.onStatus(type, pattern); + if (type == PubSubType.PUNSUBSCRIBE) { + latch.countDown(); + } } }); list.add(f); @@ -147,6 +175,22 @@ public class RedissonSubscription extends AbstractSubscription { for (byte[] channel : subscribed) { ((SubscriptionListener) getListener()).onPatternSubscribed(channel, 1); } + + // hack for RedisMessageListenerContainer + if (getListener().getClass().getName().equals("org.springframework.data.redis.listener.SynchronizingMessageListener")) { + StringWriter sw = new StringWriter(); + new Exception().printStackTrace(new PrintWriter(sw)); + String[] r = sw.toString().split("\n"); + if (r.length != 7) { + return; + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } private byte[] toBytes(Object message) { diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index f681bb714..d770e64bb 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -27,6 +27,64 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testContainer() { + RedissonConnectionFactory f = new RedissonConnectionFactory(redisson); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, ChannelTopic.of("test")); +// } +// +// container.stop(); +// +// container = new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); +// for (int i = 0; i < 2; i++) { +// container.addMessageListener(new MessageListener() { +// @Override +// public void onMessage(Message message, byte[] pattern) { +// } +// }, PatternTopic.of("*")); +// } +// container.stop(); +// +// container= new RedisMessageListenerContainer(); +// container.setConnectionFactory(f); +// container.afterPropertiesSet(); +// container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, ChannelTopic.of("test"+i)); + } + container.stop(); + + container= new RedisMessageListenerContainer(); + container.setConnectionFactory(f); + container.afterPropertiesSet(); + container.start(); + for (int i = 0; i < 2; i++) { + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + } + }, PatternTopic.of("*" + i)); + } + container.stop(); + } + @Test public void testCluster() throws IOException, InterruptedException { RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave() .notifyKeyspaceEvents(