From 31260a32ac04679c9bc8fbae611e1097981fb752 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 6 Sep 2022 11:13:37 +0300 Subject: [PATCH] refactoring --- .../connection/ClientConnectionsEntry.java | 1 - .../redisson/pubsub/PublishSubscribeService.java | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 156a7995d..1218a123a 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; /** * diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 54e47d0c2..af72e7ae3 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -463,17 +463,17 @@ public class PublishSubscribeService { return CompletableFuture.completedFuture(null); } - CompletableFuture result = new CompletableFuture<>(); AsyncSemaphore lock = getSemaphore(channelName); - lock.acquire(() -> { + CompletableFuture f = lock.acquire(); + return f.thenCompose(v -> { PubSubConnectionEntry entry = name2PubSubConnection.remove(new PubSubKey(channelName, e)); if (entry == null) { lock.release(); - result.complete(null); - return; + return CompletableFuture.completedFuture(null); } - freePubSubLock.acquire(() -> { + CompletableFuture psf = freePubSubLock.acquire(); + return psf.thenCompose(r -> { PubSubEntry ee = entry2PubSubConnection.getOrDefault(e, new PubSubEntry()); Queue freePubSubConnections = ee.getEntries(); freePubSubConnections.remove(entry); @@ -488,6 +488,7 @@ public class PublishSubscribeService { entryCodec = entry.getConnection().getChannels().get(channelName); } + CompletableFuture result = new CompletableFuture<>(); RedisPubSubListener listener = new BaseRedisPubSubListener() { @Override @@ -503,10 +504,9 @@ public class PublishSubscribeService { }; entry.unsubscribe(topicType, channelName, listener); + return result; }); }); - - return result; } private void addFreeConnectionEntry(ChannelName channelName, PubSubConnectionEntry entry) {