|
|
|
@ -24,7 +24,6 @@ import org.redisson.connection.ConnectionManager;
|
|
|
|
|
import org.redisson.connection.ConnectionManager.PubSubEntry;
|
|
|
|
|
import org.redisson.core.MessageListener;
|
|
|
|
|
import org.redisson.core.RTopic;
|
|
|
|
|
import org.redisson.core.RedisPubSubTopicListener;
|
|
|
|
|
|
|
|
|
|
import com.lambdaworks.redis.RedisConnection;
|
|
|
|
|
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
|
|
|
|
@ -34,8 +33,8 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
|
|
|
|
|
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
|
|
|
|
|
|
|
|
|
|
private final Map<Integer, RedisPubSubTopicListener<String, M>> listeners =
|
|
|
|
|
new ConcurrentHashMap<Integer, RedisPubSubTopicListener<String, M>>();
|
|
|
|
|
private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners =
|
|
|
|
|
new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>();
|
|
|
|
|
private final ConnectionManager connectionManager;
|
|
|
|
|
|
|
|
|
|
private PubSubEntry pubSubEntry;
|
|
|
|
@ -80,7 +79,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int addListener(MessageListener<M> listener) {
|
|
|
|
|
RedisPubSubTopicListener<String, M> pubSubListener = new RedisPubSubTopicListener<String, M>(listener, getName());
|
|
|
|
|
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
|
|
|
|
|
listeners.put(pubSubListener.hashCode(), pubSubListener);
|
|
|
|
|
pubSubEntry.addListener(pubSubListener);
|
|
|
|
|
return pubSubListener.hashCode();
|
|
|
|
@ -88,7 +87,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void removeListener(int listenerId) {
|
|
|
|
|
RedisPubSubTopicListener<String, M> pubSubListener = listeners.remove(listenerId);
|
|
|
|
|
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = listeners.remove(listenerId);
|
|
|
|
|
pubSubEntry.removeListener(pubSubListener);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|