|
|
|
@ -15,7 +15,6 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson.spring.data.connection;
|
|
|
|
|
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.client.BaseRedisPubSubListener;
|
|
|
|
|
import org.redisson.client.ChannelName;
|
|
|
|
|
import org.redisson.client.RedisPubSubListener;
|
|
|
|
@ -23,8 +22,6 @@ import org.redisson.client.codec.ByteArrayCodec;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubType;
|
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
|
import org.redisson.misc.CountableListener;
|
|
|
|
|
import org.redisson.misc.RedissonPromise;
|
|
|
|
|
import org.redisson.pubsub.PubSubConnectionEntry;
|
|
|
|
|
import org.redisson.pubsub.PublishSubscribeService;
|
|
|
|
|
import org.springframework.data.redis.connection.ReactiveSubscription;
|
|
|
|
@ -39,7 +36,6 @@ import java.util.*;
|
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
@ -175,15 +171,11 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
public Mono<Void> unsubscribe(ByteBuffer... channels) {
|
|
|
|
|
monosListener.acquire();
|
|
|
|
|
return Mono.defer(() -> {
|
|
|
|
|
RedissonPromise<Void> result = new RedissonPromise<>();
|
|
|
|
|
result.onComplete((r, ex) -> {
|
|
|
|
|
monosListener.release();
|
|
|
|
|
});
|
|
|
|
|
CountableListener<Void> listener = new CountableListener<>(result, null, channels.length);
|
|
|
|
|
List<CompletableFuture<?>> futures = new ArrayList<>(channels.length);
|
|
|
|
|
for (ByteBuffer channel : channels) {
|
|
|
|
|
ChannelName cn = toChannelName(channel);
|
|
|
|
|
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
|
|
|
|
|
f.onComplete((res, e) -> {
|
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
|
|
|
|
|
f = f.whenComplete((res, e) -> {
|
|
|
|
|
synchronized (RedissonReactiveSubscription.this.channels) {
|
|
|
|
|
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.channels.get(cn);
|
|
|
|
|
if (!entry.hasListeners(cn)) {
|
|
|
|
@ -191,9 +183,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
f.onComplete(listener);
|
|
|
|
|
futures.add(f);
|
|
|
|
|
}
|
|
|
|
|
return Mono.fromFuture(result);
|
|
|
|
|
|
|
|
|
|
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
|
|
|
|
future = future.whenComplete((r, e) -> {
|
|
|
|
|
monosListener.release();
|
|
|
|
|
});
|
|
|
|
|
return Mono.fromFuture(future);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -206,15 +203,11 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
public Mono<Void> pUnsubscribe(ByteBuffer... patterns) {
|
|
|
|
|
monosListener.acquire();
|
|
|
|
|
return Mono.defer(() -> {
|
|
|
|
|
RedissonPromise<Void> result = new RedissonPromise<>();
|
|
|
|
|
result.onComplete((r, ex) -> {
|
|
|
|
|
monosListener.release();
|
|
|
|
|
});
|
|
|
|
|
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
|
|
|
|
|
List<CompletableFuture<?>> futures = new ArrayList<>(patterns.length);
|
|
|
|
|
for (ByteBuffer channel : patterns) {
|
|
|
|
|
ChannelName cn = toChannelName(channel);
|
|
|
|
|
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
|
|
|
|
|
f.onComplete((res, e) -> {
|
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
|
|
|
|
|
f = f.whenComplete((res, e) -> {
|
|
|
|
|
synchronized (RedissonReactiveSubscription.this.patterns) {
|
|
|
|
|
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.patterns.get(cn);
|
|
|
|
|
entries.stream()
|
|
|
|
@ -222,9 +215,14 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
.forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
f.onComplete(listener);
|
|
|
|
|
futures.add(f);
|
|
|
|
|
}
|
|
|
|
|
return Mono.fromFuture(result);
|
|
|
|
|
|
|
|
|
|
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
|
|
|
|
future = future.whenComplete((r, e) -> {
|
|
|
|
|
monosListener.release();
|
|
|
|
|
});
|
|
|
|
|
return Mono.fromFuture(future);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|