Fixed - memory leak in publish subscribe. #1326

pull/1336/head
Nikita 7 years ago
parent 7ca8d857a8
commit 8fd4fd69b8

@ -68,8 +68,9 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (result instanceof Message) { if (result instanceof Message) {
checkpoint(); checkpoint();
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
String channelName = ((Message) result).getChannel();
if (result instanceof PubSubStatusMessage) { if (result instanceof PubSubStatusMessage) {
String channelName = ((PubSubStatusMessage) result).getChannel();
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation); PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = commands.get(key); CommandData<Object, Object> d = commands.get(key);
@ -80,20 +81,24 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) { if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
commands.remove(key); commands.remove(key);
entries.remove(channelName); if (result instanceof PubSubPatternMessage) {
channelName = ((PubSubPatternMessage)result).getPattern();
}
PubSubEntry entry = entries.remove(channelName);
if (keepOrder) {
enqueueMessage(result, pubSubConnection, entry);
}
} }
} }
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
if (keepOrder) { if (keepOrder) {
String channelName = ((Message) result).getChannel();
if (result instanceof PubSubPatternMessage) { if (result instanceof PubSubPatternMessage) {
channelName = ((PubSubPatternMessage)result).getPattern(); channelName = ((PubSubPatternMessage)result).getPattern();
} }
PubSubEntry item = entries.get(channelName); PubSubEntry entry = entries.get(channelName);
if (item != null) { if (entry != null) {
enqueueMessage(result, pubSubConnection, item); enqueueMessage(result, pubSubConnection, entry);
} }
} else { } else {
executor.execute(new Runnable() { executor.execute(new Runnable() {

@ -25,11 +25,13 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess; import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RPatternTopic;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.BaseStatusListener; import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.StatusListener; import org.redisson.api.listener.StatusListener;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
@ -166,19 +168,29 @@ public class RedissonTopicTest {
RTopic<String> stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); RTopic<String> stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
AtomicBoolean stringMessageReceived = new AtomicBoolean(); AtomicInteger stringMessageReceived = new AtomicInteger();
int listenerId = stringTopic.addListener(new MessageListener<String>() { int listenerId = stringTopic.addListener(new MessageListener<String>() {
@Override @Override
public void onMessage(String channel, String msg) { public void onMessage(String channel, String msg) {
assertThat(msg).isEqualTo("testmsg"); assertThat(msg).isEqualTo("testmsg");
stringMessageReceived.set(true); stringMessageReceived.incrementAndGet();
} }
}); });
RPatternTopic<String> patternTopic = redisson.getPatternTopic("test*", StringCodec.INSTANCE);
int patternListenerId = patternTopic.addListener(new PatternMessageListener<String>() {
@Override
public void onMessage(String pattern, String channel, String msg) {
assertThat(msg).isEqualTo("testmsg");
stringMessageReceived.incrementAndGet();
}
});
stringTopic.publish("testmsg"); stringTopic.publish("testmsg");
await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); await().atMost(Duration.ONE_SECOND).until(() -> stringMessageReceived.get() == 2);
stringTopic.removeListener(listenerId); stringTopic.removeListener(listenerId);
patternTopic.removeListener(patternListenerId);
} }
redisson.shutdown(); redisson.shutdown();
@ -632,7 +644,7 @@ public class RedissonTopicTest {
redisson.getTopic("topic").publish(1); redisson.getTopic("topic").publish(1);
await().atMost(10, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
Assert.assertTrue(executed.get()); Assert.assertTrue(executed.get());
redisson.shutdown(); redisson.shutdown();

Loading…
Cancel
Save