|
|
|
@ -28,10 +28,7 @@ import org.redisson.connection.ServiceManager;
|
|
|
|
|
import org.redisson.misc.AsyncSemaphore;
|
|
|
|
|
import org.redisson.misc.WrappedLock;
|
|
|
|
|
|
|
|
|
|
import java.util.EventListener;
|
|
|
|
|
import java.util.LinkedList;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Queue;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
@ -58,6 +55,14 @@ public class PubSubConnectionEntry {
|
|
|
|
|
private final ServiceManager serviceManager;
|
|
|
|
|
private final PublishSubscribeService subscribeService;
|
|
|
|
|
|
|
|
|
|
private static final Map<PubSubType, PubSubType> SUBSCRIBE2UNSUBSCRIBE = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
static {
|
|
|
|
|
SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.SUBSCRIBE, PubSubType.UNSUBSCRIBE);
|
|
|
|
|
SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.SSUBSCRIBE, PubSubType.SUNSUBSCRIBE);
|
|
|
|
|
SUBSCRIBE2UNSUBSCRIBE.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public PubSubConnectionEntry(RedisPubSubConnection conn, ConnectionManager connectionManager) {
|
|
|
|
|
super();
|
|
|
|
|
this.conn = conn;
|
|
|
|
@ -170,7 +175,7 @@ public class PubSubConnectionEntry {
|
|
|
|
|
CompletableFuture<PubSubConnectionEntry> pp = new CompletableFuture<>();
|
|
|
|
|
pp.whenComplete((r, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
PubSubType unsubscribeType = PublishSubscribeService.SUBSCRIBE2UNSUBSCRIBE.get(type);
|
|
|
|
|
PubSubType unsubscribeType = SUBSCRIBE2UNSUBSCRIBE.get(type);
|
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(channelName, unsubscribeType);
|
|
|
|
|
f.whenComplete((rr, ee) -> {
|
|
|
|
|
pm.completeExceptionally(e);
|
|
|
|
|