refactoring

pull/4531/head
Nikita Koksharov 3 years ago
parent bb72b65210
commit 31260a32ac

@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* *

@ -463,17 +463,17 @@ public class PublishSubscribeService {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
CompletableFuture<Codec> result = new CompletableFuture<>();
AsyncSemaphore lock = getSemaphore(channelName); AsyncSemaphore lock = getSemaphore(channelName);
lock.acquire(() -> { CompletableFuture<Void> f = lock.acquire();
return f.thenCompose(v -> {
PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, e)); PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, e));
if (entry == null) { if (entry == null) {
lock.release(); lock.release();
result.complete(null); return CompletableFuture.completedFuture(null);
return;
} }
freePubSubLock.acquire(() -> { CompletableFuture<Void> psf = freePubSubLock.acquire();
return psf.thenCompose(r -> {
PubSubEntry ee = entry2PubSubConnection.getOrDefault(e, new PubSubEntry()); PubSubEntry ee = entry2PubSubConnection.getOrDefault(e, new PubSubEntry());
Queue<PubSubConnectionEntry> freePubSubConnections = ee.getEntries(); Queue<PubSubConnectionEntry> freePubSubConnections = ee.getEntries();
freePubSubConnections.remove(entry); freePubSubConnections.remove(entry);
@ -488,6 +488,7 @@ public class PublishSubscribeService {
entryCodec = entry.getConnection().getChannels().get(channelName); entryCodec = entry.getConnection().getChannels().get(channelName);
} }
CompletableFuture<Codec> result = new CompletableFuture<>();
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() { RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override @Override
@ -503,10 +504,9 @@ public class PublishSubscribeService {
}; };
entry.unsubscribe(topicType, channelName, listener); entry.unsubscribe(topicType, channelName, listener);
return result;
}); });
}); });
return result;
} }
private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) { private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {

Loading…
Cancel
Save