Fixed - possible pubsub listeners leak. #1268

pull/1300/head
Nikita 7 years ago
parent 707b419d97
commit a481259aa5

@ -54,7 +54,6 @@ import org.redisson.config.TransportMode;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
@ -432,33 +431,35 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
lock.acquire(new Runnable() {
@Override
public void run() {
RFuture<PubSubConnectionEntry> future = psubscribe(channelName, codec, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
}
});
return result;
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, listeners);
}
@Override
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise;
}
public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?>... listeners) {
@Override
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners) {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners);
}
protected RFuture<PubSubConnectionEntry> subscribe(PubSubType type, final Codec codec, final String channelName,
final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
lock.acquire(new Runnable() {
@Override
public void run() {
RFuture<PubSubConnectionEntry> future = subscribe(codec, channelName, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result));
if (result.isDone()) {
lock.release();
return;
}
subscribe(codec, channelName, result, type, lock, listeners);
}
});
return result;
@ -487,6 +488,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return;
}
@ -536,8 +539,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(connEntry);
if (!promise.trySuccess(connEntry)) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.removeListener(channelName, listener);
}
if (!connEntry.hasListeners(channelName)) {
unsubscribe(channelName, lock);
} else {
lock.release();
}
} else {
lock.release();
}
}
});
}

Loading…
Cancel
Save