diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 7c0c10e6d..ac7c22ed6 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -156,7 +156,7 @@ public class RedissonTopic implements RTopic { } if (entry.removeAllListeners(channelName)) { - subscribeService.unsubscribe(channelName, semaphore); + subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly(); } else { semaphore.release(); } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 981146865..c05f1dfe3 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -105,6 +105,14 @@ public class PublishSubscribeService { } final RPromise result = new RedissonPromise(); + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + } + } + }); result.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -246,6 +254,14 @@ public class PublishSubscribeService { final RPromise promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener... listeners) { final int slot = connectionManager.calcSlot(channelName.getName()); RFuture connFuture = nextPubSubConnection(slot); + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + ((RPromise)connFuture).tryFailure(future.cause()); + } + } + }); connFuture.addListener(new FutureListener() { @Override diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 389943363..5d2ea6627 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -7,7 +7,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -26,7 +28,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; -import org.redisson.RedissonTopicPatternTest.Message; import org.redisson.api.RFuture; import org.redisson.api.RPatternTopic; import org.redisson.api.RSet; @@ -387,6 +388,54 @@ public class RedissonTopicTest { redisson.shutdown(); } + @Test + public void testSubscribeLimit() throws Exception { + RedisProcess runner = new RedisRunner() + .port(RedisRunner.findFreePort()) + .nosave() + .randomDir() + .run(); + + int connection = 10; + int subscription = 5; + Config config = new Config(); + config.useSingleServer() + .setAddress("redis://localhost:" + runner.getRedisServerPort()) + .setSubscriptionConnectionPoolSize(connection) + .setSubscriptionsPerConnection(subscription); + RedissonClient redissonClient = Redisson.create(config); + final Queue queue = new LinkedList<>(); + int i = 0; + boolean timeout = false; + while (true) { + try{ + if (timeout) { + System.out.println("destroy"); + queue.poll().removeAllListeners(); + } + RTopic topic = redissonClient.getTopic(++i + ""); + topic.addListener(Object.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, Object msg) { + // TODO Auto-generated method stub + + } + }); + queue.offer(topic); + if (i > 1000) { + break; + } + System.out.println(i + " - " + queue.size()); + }catch(Exception e){ + timeout = true; + e.printStackTrace(); + } + } + + redissonClient.shutdown(); + runner.stop(); + } + @Test public void testRemoveAllListeners2() throws InterruptedException { RedissonClient redisson = BaseTest.createInstance();