Subscription connection leak in PubSubConnectionEntry::removeListener

& unsubscribe process optimization. #237
pull/238/head
Nikita 10 years ago
parent 2f99a27103
commit 48dea3709e

@ -103,11 +103,11 @@ public class PubSubConnectionEntry {
}
}
public void removeListener(String channelName, RedisPubSubListener listener) {
private void removeListener(String channelName, RedisPubSubListener listener) {
Queue<RedisPubSubListener> queue = channelListeners.get(channelName);
synchronized (queue) {
if (queue.remove(listener)) {
channelListeners.remove(channelName, new ConcurrentLinkedQueue<RedisPubSubListener>());
if (queue.remove(listener) && queue.isEmpty()) {
channelListeners.remove(channelName);
}
}
conn.removeListener(listener);
@ -139,34 +139,36 @@ public class PubSubConnectionEntry {
@Override
public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
subscribedChannelsAmount.release();
removeListeners(channel);
return true;
}
return false;
}
});
conn.addOneShotListener(listener);
conn.unsubscribe(channel);
}
private void removeListeners(String channel) {
Queue<RedisPubSubListener> queue = channelListeners.get(channel);
if (queue != null) {
synchronized (queue) {
channelListeners.remove(channel);
}
for (RedisPubSubListener listener : queue) {
conn.removeListener(listener);
}
}
subscribedChannelsAmount.release();
}
public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
for (RedisPubSubListener listener : listeners) {
removeListener(channel, listener);
}
}
subscribedChannelsAmount.release();
removeListeners(channel);
return true;
}
return false;

Loading…
Cancel
Save