From 799510c342782db72e8fc936b657a545ff0fa59b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 12 Oct 2022 14:25:40 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/pubsub/PublishSubscribe.java | 5 ++--- .../java/org/redisson/pubsub/PublishSubscribeService.java | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java index 7046a1803..67f3dc1ef 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -44,7 +44,7 @@ abstract class PublishSubscribe> { public void unsubscribe(E entry, String entryName, String channelName) { AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); - semaphore.acquire(() -> { + semaphore.acquire().thenAccept(c -> { if (entry.release() == 0) { entries.remove(entryName); service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName)) @@ -55,7 +55,6 @@ abstract class PublishSubscribe> { semaphore.release(); } }); - } public void timeout(CompletableFuture promise) { @@ -70,7 +69,7 @@ abstract class PublishSubscribe> { AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); CompletableFuture newPromise = new CompletableFuture<>(); - semaphore.acquire(() -> { + semaphore.acquire().thenAccept(c -> { if (newPromise.isDone()) { semaphore.release(); return; diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 6282bdbc6..38d3b4779 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -197,7 +197,7 @@ public class PublishSubscribeService { "Unable to acquire subscription lock after " + timeout + "ms. " + "Try to increase 'timeout', 'subscriptionsPerConnection', 'subscriptionConnectionPoolSize' parameters.")); }, timeout, TimeUnit.MILLISECONDS); - lock.acquire(() -> { + lock.acquire().thenAccept(r -> { if (!lockTimeout.cancel() || promise.isDone()) { lock.release(); return; @@ -257,7 +257,7 @@ public class PublishSubscribeService { return; } - freePubSubLock.acquire(() -> { + freePubSubLock.acquire().thenAccept(c -> { if (promise.isDone()) { lock.release(); freePubSubLock.release(); @@ -390,7 +390,7 @@ public class PublishSubscribeService { return; } - freePubSubLock.acquire(() -> { + freePubSubLock.acquire().thenAccept(c -> { PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager); int remainFreeAmount = entry.tryAcquire(); @@ -553,7 +553,7 @@ public class PublishSubscribeService { continue; } - freePubSubLock.acquire(() -> { + freePubSubLock.acquire().thenAccept(r -> { e.getValue().getEntries().remove(entry); freePubSubLock.release(); });