Fixed - Spring Redis Data PatternTopic listeners are invoked multiple times per message. #3369

pull/3384/head
Nikita Koksharov 4 years ago
parent 4722679004
commit eb6a9126ed

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -59,7 +59,8 @@ public class RedissonSubscription extends AbstractSubscription {
return;
}
DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), (byte[])message);
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName) ch).getName(), m);
getListener().onMessage(msg, null);
}
});
@ -79,17 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), (byte[])message);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {
@ -97,6 +101,13 @@ public class RedissonSubscription extends AbstractSubscription {
}
}
private byte[] toBytes(Object message) {
if (message instanceof String) {
return ((String) message).getBytes();
}
return (byte[]) message;
}
@Override
protected void doPUnsubscribe(boolean all, byte[]... patterns) {
for (byte[] pattern : patterns) {

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -1,16 +1,78 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testPatterTopic() throws IOException, InterruptedException {
RedisRunner.RedisProcess instance = new RedisRunner()
.nosave()
.randomPort()
.randomDir()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
.run();
Config config = new Config();
config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort()).setPingConnectionInterval(0);
RedissonClient redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
AtomicInteger counterTest = new AtomicInteger();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
counterTest.incrementAndGet();
}
}, new PatternTopic("__keyspace@0__:mykey"));
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
counterTest.incrementAndGet();
}
}, new PatternTopic("__keyevent@0__:del"));
container.afterPropertiesSet();
container.start();
Assertions.assertThat(container.isRunning()).isTrue();
RedisConnection c = factory.getConnection();
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> {
return counterTest.get() == 3;
});
container.stop();
redisson.shutdown();
}
@Test
public void testSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);

@ -80,18 +80,20 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
RedisPubSubListener<?> listener2 = new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)channel).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
};
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, listener2);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
return;
}
byte[] m = toBytes(message);
DefaultMessage msg = new DefaultMessage(((ChannelName)ch).getName(), m);
getListener().onMessage(msg, ((ChannelName)pattern).getName());
}
});
list.add(f);
}
for (RFuture<?> future : list) {

@ -1,16 +1,78 @@
package org.redisson.spring.data.connection;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
public class RedissonSubscribeTest extends BaseConnectionTest {
@Test
public void testPatterTopic() throws IOException, InterruptedException {
RedisRunner.RedisProcess instance = new RedisRunner()
.nosave()
.randomPort()
.randomDir()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.K,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.$)
.run();
Config config = new Config();
config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort()).setPingConnectionInterval(0);
RedissonClient redisson = Redisson.create(config);
RedissonConnectionFactory factory = new RedissonConnectionFactory(redisson);
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
AtomicInteger counterTest = new AtomicInteger();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
counterTest.incrementAndGet();
}
}, new PatternTopic("__keyspace@0__:mykey"));
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
counterTest.incrementAndGet();
}
}, new PatternTopic("__keyevent@0__:del"));
container.afterPropertiesSet();
container.start();
Assertions.assertThat(container.isRunning()).isTrue();
RedisConnection c = factory.getConnection();
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> {
return counterTest.get() == 3;
});
container.stop();
redisson.shutdown();
}
@Test
public void testSubscribe() {
RedissonConnection connection = new RedissonConnection(redisson);

Loading…
Cancel
Save