From eb6a9126ed4b2ad8c0655bca697d943fba2130c0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 21 Jan 2021 14:02:43 +0300 Subject: [PATCH] Fixed - Spring Redis Data PatternTopic listeners are invoked multiple times per message. #3369 --- .../data/connection/RedissonSubscription.java | 22 ++++--- .../data/connection/RedissonSubscription.java | 22 ++++--- .../data/connection/RedissonSubscription.java | 22 ++++--- .../data/connection/RedissonSubscription.java | 22 ++++--- .../data/connection/RedissonSubscription.java | 22 ++++--- .../data/connection/RedissonSubscription.java | 31 +++++++--- .../data/connection/RedissonSubscription.java | 22 ++++--- .../connection/RedissonSubscribeTest.java | 62 +++++++++++++++++++ .../data/connection/RedissonSubscription.java | 22 ++++--- .../connection/RedissonSubscribeTest.java | 62 +++++++++++++++++++ 10 files changed, 229 insertions(+), 80 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index b666c6f82..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -59,7 +59,8 @@ public class RedissonSubscription extends AbstractSubscription { return; } - DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), (byte[])message); + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), m); getListener().onMessage(msg, null); } }); @@ -79,17 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), (byte[])message); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { @@ -97,6 +101,13 @@ public class RedissonSubscription extends AbstractSubscription { } } + private byte[] toBytes(Object message) { + if (message instanceof String) { + return ((String) message).getBytes(); + } + return (byte[]) message; + } + @Override protected void doPUnsubscribe(boolean all, byte[]... patterns) { for (byte[] pattern : patterns) { diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index 629de7d65..3d0e3bfb1 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-23/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -1,16 +1,78 @@ package org.redisson.spring.data.connection; +import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Test; +import org.redisson.RedisRunner; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testPatterTopic() throws IOException, InterruptedException { + RedisRunner.RedisProcess instance = new RedisRunner() + .nosave() + .randomPort() + .randomDir() + .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + .run(); + + Config config = new Config(); + config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort()).setPingConnectionInterval(0); + RedissonClient redisson = Redisson.create(config); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + AtomicInteger counterTest = new AtomicInteger(); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + counterTest.incrementAndGet(); + } + }, new PatternTopic("__keyspace@0__:mykey")); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + counterTest.incrementAndGet(); + } + }, new PatternTopic("__keyevent@0__:del")); + container.afterPropertiesSet(); + container.start(); + Assertions.assertThat(container.isRunning()).isTrue(); + + RedisConnection c = factory.getConnection(); + c.set("mykey".getBytes(), "2".getBytes()); + c.del("mykey".getBytes()); + + Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> { + return counterTest.get() == 3; + }); + + container.stop(); + redisson.shutdown(); + } + @Test public void testSubscribe() { RedissonConnection connection = new RedissonConnection(redisson); diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java index 7e29e539c..545b06fc1 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonSubscription.java @@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription { @Override protected void doPsubscribe(byte[]... patterns) { - RedisPubSubListener listener2 = new BaseRedisPubSubListener() { - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - byte[] m = toBytes(message); - DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m); - getListener().onMessage(msg, ((ChannelName)pattern).getName()); - } - }; - List> list = new ArrayList>(); for (byte[] channel : patterns) { - RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2); + RFuture f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() { + @Override + public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) { + if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) { + return; + } + + byte[] m = toBytes(message); + DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m); + getListener().onMessage(msg, ((ChannelName)pattern).getName()); + } + }); list.add(f); } for (RFuture future : list) { diff --git a/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java b/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java index 629de7d65..3d0e3bfb1 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java +++ b/redisson-spring-data/redisson-spring-data-24/src/test/java/org/redisson/spring/data/connection/RedissonSubscribeTest.java @@ -1,16 +1,78 @@ package org.redisson.spring.data.connection; +import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Test; +import org.redisson.RedisRunner; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; public class RedissonSubscribeTest extends BaseConnectionTest { + @Test + public void testPatterTopic() throws IOException, InterruptedException { + RedisRunner.RedisProcess instance = new RedisRunner() + .nosave() + .randomPort() + .randomDir() + .notifyKeyspaceEvents( + RedisRunner.KEYSPACE_EVENTS_OPTIONS.K, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.g, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.E, + RedisRunner.KEYSPACE_EVENTS_OPTIONS.$) + .run(); + + Config config = new Config(); + config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort()).setPingConnectionInterval(0); + RedissonClient redisson = Redisson.create(config); + + RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson); + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + AtomicInteger counterTest = new AtomicInteger(); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + counterTest.incrementAndGet(); + } + }, new PatternTopic("__keyspace@0__:mykey")); + container.addMessageListener(new MessageListener() { + @Override + public void onMessage(Message message, byte[] pattern) { + counterTest.incrementAndGet(); + } + }, new PatternTopic("__keyevent@0__:del")); + container.afterPropertiesSet(); + container.start(); + Assertions.assertThat(container.isRunning()).isTrue(); + + RedisConnection c = factory.getConnection(); + c.set("mykey".getBytes(), "2".getBytes()); + c.del("mykey".getBytes()); + + Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> { + return counterTest.get() == 3; + }); + + container.stop(); + redisson.shutdown(); + } + @Test public void testSubscribe() { RedissonConnection connection = new RedissonConnection(redisson);