|
|
@ -33,11 +33,12 @@ import java.util.*;
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* @author Nikita Koksharov
|
|
|
|
* @author Nikita Koksharov
|
|
|
|
*
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
*/
|
|
|
@ -45,19 +46,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
|
|
|
|
|
|
|
|
public static class ListenableCounter {
|
|
|
|
public static class ListenableCounter {
|
|
|
|
|
|
|
|
|
|
|
|
private int state;
|
|
|
|
private final AtomicInteger state = new AtomicInteger();
|
|
|
|
private Runnable r;
|
|
|
|
private Runnable r;
|
|
|
|
|
|
|
|
|
|
|
|
public synchronized void acquire() {
|
|
|
|
public void acquire() {
|
|
|
|
state++;
|
|
|
|
state.incrementAndGet();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public void release() {
|
|
|
|
public void release() {
|
|
|
|
synchronized (this) {
|
|
|
|
if (state.decrementAndGet() != 0) {
|
|
|
|
state--;
|
|
|
|
return;
|
|
|
|
if (state != 0) {
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (r != null) {
|
|
|
|
if (r != null) {
|
|
|
@ -66,12 +64,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public synchronized void addListener(Runnable r) {
|
|
|
|
public void addListener(Runnable r) {
|
|
|
|
synchronized (this) {
|
|
|
|
if (state.get() != 0) {
|
|
|
|
if (state != 0) {
|
|
|
|
this.r = r;
|
|
|
|
this.r = r;
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
r.run();
|
|
|
|
r.run();
|
|
|
@ -85,7 +81,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
private final ListenableCounter monosListener = new ListenableCounter();
|
|
|
|
private final ListenableCounter monosListener = new ListenableCounter();
|
|
|
|
|
|
|
|
|
|
|
|
private final PublishSubscribeService subscribeService;
|
|
|
|
private final PublishSubscribeService subscribeService;
|
|
|
|
|
|
|
|
|
|
|
|
public RedissonReactiveSubscription(ConnectionManager connectionManager) {
|
|
|
|
public RedissonReactiveSubscription(ConnectionManager connectionManager) {
|
|
|
|
this.subscribeService = connectionManager.getSubscribeService();
|
|
|
|
this.subscribeService = connectionManager.getSubscribeService();
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -108,7 +104,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
return Mono.fromFuture(future);
|
|
|
|
return Mono.fromFuture(future);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected ChannelName toChannelName(ByteBuffer channel) {
|
|
|
|
protected ChannelName toChannelName(ByteBuffer channel) {
|
|
|
@ -149,17 +144,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
ChannelName cn = toChannelName(channel);
|
|
|
|
ChannelName cn = toChannelName(channel);
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE);
|
|
|
|
f = f.whenComplete((res, e) -> {
|
|
|
|
f = f.whenComplete((res, e) -> {
|
|
|
|
synchronized (RedissonReactiveSubscription.this.channels) {
|
|
|
|
RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> {
|
|
|
|
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.channels.get(cn);
|
|
|
|
entries.removeIf(entry -> !entry.hasListeners(cn));
|
|
|
|
for (PubSubConnectionEntry entry : entries) {
|
|
|
|
if (entries.isEmpty()) {
|
|
|
|
if (!entry.hasListeners(cn)) {
|
|
|
|
return null;
|
|
|
|
entries.remove(entry);
|
|
|
|
|
|
|
|
if (entries.isEmpty()) {
|
|
|
|
|
|
|
|
RedissonReactiveSubscription.this.channels.remove(cn);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return entries;
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
futures.add(f);
|
|
|
|
futures.add(f);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -186,12 +177,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
|
|
|
|
ChannelName cn = toChannelName(channel);
|
|
|
|
ChannelName cn = toChannelName(channel);
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
|
|
|
|
CompletableFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
|
|
|
|
f = f.whenComplete((res, e) -> {
|
|
|
|
f = f.whenComplete((res, e) -> {
|
|
|
|
synchronized (RedissonReactiveSubscription.this.patterns) {
|
|
|
|
RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> {
|
|
|
|
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.patterns.get(cn);
|
|
|
|
entries.removeIf(entry -> !entry.hasListeners(cn));
|
|
|
|
entries.stream()
|
|
|
|
if (entries.isEmpty()) {
|
|
|
|
.filter(en -> en.hasListeners(cn))
|
|
|
|
return null;
|
|
|
|
.forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return entries;
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
futures.add(f);
|
|
|
|
futures.add(f);
|
|
|
|
}
|
|
|
|
}
|
|
|
|