Fixed - RTopic.removeAllListeners leads to PubSub connections leak. #1696

pull/1721/head
Nikita Koksharov 6 years ago
parent b4135a9994
commit 040c659f30

@ -147,8 +147,7 @@ public class RedissonPatternTopic implements RPatternTopic {
return; return;
} }
entry.removeAllListeners(channelName); if (entry.removeAllListeners(channelName)) {
if (!entry.hasListeners(channelName)) {
subscribeService.punsubscribe(channelName, semaphore); subscribeService.punsubscribe(channelName, semaphore);
} else { } else {
semaphore.release(); semaphore.release();

@ -146,7 +146,20 @@ public class RedissonTopic implements RTopic {
@Override @Override
public void removeAllListeners() { public void removeAllListeners() {
subscribeService.unsubscribe(channelName, PubSubType.UNSUBSCRIBE); AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
return;
}
if (entry.removeAllListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore);
} else {
semaphore.release();
}
} }
protected void acquire(AsyncSemaphore semaphore) { protected void acquire(AsyncSemaphore semaphore) {

@ -100,7 +100,7 @@ public class PubSubConnectionEntry {
for (RedisPubSubListener<?> listener : listeners) { for (RedisPubSubListener<?> listener : listeners) {
removeListener(channelName, listener); removeListener(channelName, listener);
} }
return !listeners.isEmpty(); return listeners.isEmpty();
} }
// TODO optimize // TODO optimize

@ -369,9 +369,11 @@ public class RedissonTopicTest {
public void testRemoveAllListeners() throws InterruptedException { public void testRemoveAllListeners() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance(); RedissonClient redisson = BaseTest.createInstance();
RTopic topic1 = redisson.getTopic("topic1"); RTopic topic1 = redisson.getTopic("topic1");
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
topic1.addListener(Message.class, (channel, msg) -> { topic1.addListener(Message.class, (channel, msg) -> {
Assert.fail(); counter.incrementAndGet();
}); });
} }
@ -379,6 +381,33 @@ public class RedissonTopicTest {
topic1.removeAllListeners(); topic1.removeAllListeners();
topic1.publish(new Message("123")); topic1.publish(new Message("123"));
Thread.sleep(1000);
assertThat(counter.get()).isZero();
redisson.shutdown();
}
@Test
public void testRemoveAllListeners2() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic topic1 = redisson.getTopic("topic1");
AtomicInteger counter = new AtomicInteger();
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 10; i++) {
topic1.addListener(Message.class, (channel, msg) -> {
counter.incrementAndGet();
});
}
topic1 = redisson.getTopic("topic1");
topic1.removeAllListeners();
topic1.publish(new Message("123"));
}
Thread.sleep(1000);
assertThat(counter.get()).isZero();
redisson.shutdown(); redisson.shutdown();
} }

Loading…
Cancel
Save