|
|
|
@ -19,6 +19,7 @@ import io.netty.channel.ChannelFuture;
|
|
|
|
|
import io.netty.channel.ChannelFutureListener;
|
|
|
|
|
import org.redisson.PubSubMessageListener;
|
|
|
|
|
import org.redisson.PubSubPatternMessageListener;
|
|
|
|
|
import org.redisson.PubSubStatusListener;
|
|
|
|
|
import org.redisson.client.*;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
|
|
|
|
@ -123,11 +124,11 @@ public class PubSubConnectionEntry {
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public boolean removeListener(ChannelName channelName, int listenerId) {
|
|
|
|
|
Queue<RedisPubSubListener<?>> listeners = channelListeners.getOrDefault(channelName, EMPTY_QUEUE);
|
|
|
|
|
for (RedisPubSubListener<?> listener : listeners) {
|
|
|
|
|
if (System.identityHashCode(listener) == listenerId) {
|
|
|
|
|
if (hasId(listener, listenerId)) {
|
|
|
|
|
removeListener(channelName, listener);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -135,6 +136,28 @@ public class PubSubConnectionEntry {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean hasId(RedisPubSubListener<?> listener, int listenerId) {
|
|
|
|
|
if (System.identityHashCode(listener) == listenerId) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (listener instanceof PubSubStatusListener) {
|
|
|
|
|
PubSubStatusListener pubSubStatusListener = (PubSubStatusListener) listener;
|
|
|
|
|
if (System.identityHashCode(pubSubStatusListener.getListener()) == listenerId) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (listener instanceof PubSubMessageListener) {
|
|
|
|
|
PubSubMessageListener pubSubStatusListener = (PubSubMessageListener) listener;
|
|
|
|
|
if (System.identityHashCode(pubSubStatusListener.getListener()) == listenerId) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void removeListener(ChannelName channelName, RedisPubSubListener<?> listener) {
|
|
|
|
|
channelListeners.computeIfPresent(channelName, (k, queue) -> {
|
|
|
|
|
if (queue.remove(listener) && queue.isEmpty()) {
|
|
|
|
|