|
|
|
@ -16,7 +16,6 @@
|
|
|
|
|
package org.redisson.spring.data.connection;
|
|
|
|
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
import java.util.Set;
|
|
|
|
@ -128,7 +127,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<Flux<Message<ByteBuffer, ByteBuffer>>> flux = new AtomicReference<>();
|
|
|
|
|
private Disposable disposable;
|
|
|
|
|
private volatile Disposable disposable;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Flux<Message<ByteBuffer, ByteBuffer>> receive() {
|
|
|
|
@ -138,19 +137,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
|
|
|
|
|
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
|
|
|
|
|
emitter.onRequest(n -> {
|
|
|
|
|
Map<ByteBuffer, BaseRedisPubSubListener> channelMap = new HashMap<ByteBuffer, BaseRedisPubSubListener>();
|
|
|
|
|
Map<ByteBuffer, BaseRedisPubSubListener> patternMap = new HashMap<ByteBuffer, BaseRedisPubSubListener>();
|
|
|
|
|
|
|
|
|
|
disposable = () -> {
|
|
|
|
|
for (Entry<ByteBuffer, BaseRedisPubSubListener> entry : channelMap.entrySet()) {
|
|
|
|
|
PubSubConnectionEntry e = channels.get(entry.getKey());
|
|
|
|
|
e.removeListener(toChannelName(entry.getKey()), entry.getValue());
|
|
|
|
|
}
|
|
|
|
|
for (Entry<ByteBuffer, BaseRedisPubSubListener> entry : patternMap.entrySet()) {
|
|
|
|
|
PubSubConnectionEntry e = patterns.get(entry.getKey());
|
|
|
|
|
e.removeListener(toChannelName(entry.getKey()), entry.getValue());
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
AtomicLong counter = new AtomicLong(n);
|
|
|
|
|
BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
|
|
|
|
@ -175,25 +161,24 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
disposable = () -> {
|
|
|
|
|
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
|
|
|
|
|
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
|
|
|
|
|
}
|
|
|
|
|
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
|
|
|
|
|
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
|
|
|
|
|
channelMap.put(entry.getKey(), listener);
|
|
|
|
|
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
|
|
|
|
|
patternMap.put(entry.getKey(), listener);
|
|
|
|
|
entry.getValue().addListener(toChannelName(entry.getKey()), listener);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
disposable = () -> {
|
|
|
|
|
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : channels.entrySet()) {
|
|
|
|
|
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
|
|
|
|
|
}
|
|
|
|
|
for (Entry<ByteBuffer, PubSubConnectionEntry> entry : patterns.entrySet()) {
|
|
|
|
|
entry.getValue().removeListener(toChannelName(entry.getKey()), listener);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
emitter.onDispose(disposable);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|