Fixed - RTopic and all objects based on it stop work properly if Subscribe timeout has happened. #1717

pull/1792/head
Nikita Koksharov 6 years ago
parent 1ab6ebf38a
commit bc7ab0e586

@ -156,7 +156,7 @@ public class RedissonTopic implements RTopic {
}
if (entry.removeAllListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore);
subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
}

@ -105,6 +105,14 @@ public class PublishSubscribeService {
}
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
promise.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
}
}
});
result.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
@ -246,6 +254,14 @@ public class PublishSubscribeService {
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final int slot = connectionManager.calcSlot(channelName.getName());
RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
promise.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
((RPromise<RedisPubSubConnection>)connFuture).tryFailure(future.cause());
}
}
});
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override

@ -7,7 +7,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -26,7 +28,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.RedissonTopicPatternTest.Message;
import org.redisson.api.RFuture;
import org.redisson.api.RPatternTopic;
import org.redisson.api.RSet;
@ -387,6 +388,54 @@ public class RedissonTopicTest {
redisson.shutdown();
}
@Test
public void testSubscribeLimit() throws Exception {
RedisProcess runner = new RedisRunner()
.port(RedisRunner.findFreePort())
.nosave()
.randomDir()
.run();
int connection = 10;
int subscription = 5;
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:" + runner.getRedisServerPort())
.setSubscriptionConnectionPoolSize(connection)
.setSubscriptionsPerConnection(subscription);
RedissonClient redissonClient = Redisson.create(config);
final Queue<RTopic> queue = new LinkedList<>();
int i = 0;
boolean timeout = false;
while (true) {
try{
if (timeout) {
System.out.println("destroy");
queue.poll().removeAllListeners();
}
RTopic topic = redissonClient.getTopic(++i + "");
topic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
// TODO Auto-generated method stub
}
});
queue.offer(topic);
if (i > 1000) {
break;
}
System.out.println(i + " - " + queue.size());
}catch(Exception e){
timeout = true;
e.printStackTrace();
}
}
redissonClient.shutdown();
runner.stop();
}
@Test
public void testRemoveAllListeners2() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();

Loading…
Cancel
Save