From 9460b2556fccbd0a955528b8648d145b3ca569cb Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Apr 2023 12:42:22 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonBaseLock.java | 55 +----------------- .../main/java/org/redisson/RedissonLock.java | 4 +- .../redisson/connection/ServiceManager.java | 57 +++++++++++++++++++ 3 files changed, 62 insertions(+), 54 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index 99b232d3f..124a0e6f8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -318,40 +318,9 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc return unlockAsync(threadId); } - protected final RFuture execute(Supplier> supplier) { - CompletableFuture result = new CompletableFuture<>(); - int retryAttempts = commandExecutor.getServiceManager().getConfig().getRetryAttempts(); - AtomicInteger attempts = new AtomicInteger(retryAttempts); - execute(attempts, result, supplier); - return new CompletableFutureWrapper<>(result); - } - - private void execute(AtomicInteger attempts, CompletableFuture result, Supplier> supplier) { - RFuture future = supplier.get(); - future.whenComplete((r, e) -> { - if (e != null) { - if (e.getCause().getMessage().equals("None of slaves were synced")) { - if (attempts.decrementAndGet() < 0) { - result.completeExceptionally(e); - return; - } - - commandExecutor.getServiceManager().newTimeout(t -> execute(attempts, result, supplier), - commandExecutor.getServiceManager().getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - return; - } - - result.completeExceptionally(e); - return; - } - - result.complete(r); - }); - } - @Override public RFuture unlockAsync(long threadId) { - return execute(() -> unlockAsync0(threadId)); + return commandExecutor.getServiceManager().execute(() -> unlockAsync0(threadId)); } private RFuture unlockAsync0(long threadId) { @@ -439,26 +408,8 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc return tryLockAsync(waitTime, leaseTime, unit, currentThreadId); } - protected CompletionStage handleNoSync(long threadId, CompletionStage ttlRemainingFuture) { - CompletionStage s = ttlRemainingFuture.handle((r, ex) -> { - if (ex != null) { - if (ex.getCause().getMessage().equals("None of slaves were synced")) { - return unlockInnerAsync(threadId).handle((r1, e) -> { - if (e != null) { - if (e.getCause().getMessage().equals("None of slaves were synced")) { - throw new CompletionException(ex.getCause()); - } - e.getCause().addSuppressed(ex.getCause()); - } - throw new CompletionException(ex.getCause()); - }); - } else { - throw new CompletionException(ex.getCause()); - } - } - return CompletableFuture.completedFuture(r); - }).thenCompose(f -> (CompletionStage) f); - return s; + protected final CompletionStage handleNoSync(long threadId, CompletionStage ttlRemainingFuture) { + return commandExecutor.getServiceManager().handleNoSync(ttlRemainingFuture, () -> unlockInnerAsync(threadId)); } } diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index bfa9a4d69..bfe69f965 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -144,7 +144,7 @@ public class RedissonLock extends RedissonBaseLock { } private RFuture tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) { - return execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); + return commandExecutor.getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private RFuture tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { @@ -425,7 +425,7 @@ public class RedissonLock extends RedissonBaseLock { @Override public RFuture tryLockAsync(long threadId) { - return execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId)); + return commandExecutor.getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId)); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 972866f46..3f8f8b0e9 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -40,11 +40,13 @@ import io.netty.util.internal.PlatformDependent; import org.redisson.ElementsSubscribeService; import org.redisson.Version; import org.redisson.api.NatMapper; +import org.redisson.api.RFuture; import org.redisson.cache.LRUCacheMap; import org.redisson.client.RedisNodeNotFoundException; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.TransportMode; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RedisURI; import org.slf4j.Logger; @@ -56,6 +58,8 @@ import java.net.UnknownHostException; import java.security.MessageDigest; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; /** * @@ -373,4 +377,57 @@ public class ServiceManager { }); } + public RFuture execute(Supplier> supplier) { + CompletableFuture result = new CompletableFuture<>(); + int retryAttempts = config.getRetryAttempts(); + AtomicInteger attempts = new AtomicInteger(retryAttempts); + execute(attempts, result, supplier); + return new CompletableFutureWrapper<>(result); + } + + private void execute(AtomicInteger attempts, CompletableFuture result, Supplier> supplier) { + RFuture future = supplier.get(); + future.whenComplete((r, e) -> { + if (e != null) { + if (e.getCause().getMessage().equals("None of slaves were synced")) { + if (attempts.decrementAndGet() < 0) { + result.completeExceptionally(e); + return; + } + + newTimeout(t -> execute(attempts, result, supplier), + config.getRetryInterval(), TimeUnit.MILLISECONDS); + return; + } + + result.completeExceptionally(e); + return; + } + + result.complete(r); + }); + } + + public CompletionStage handleNoSync(CompletionStage stage, Supplier> supplier) { + CompletionStage s = stage.handle((r, ex) -> { + if (ex != null) { + if (ex.getCause().getMessage().equals("None of slaves were synced")) { + return supplier.get().handle((r1, e) -> { + if (e != null) { + if (e.getCause().getMessage().equals("None of slaves were synced")) { + throw new CompletionException(ex.getCause()); + } + e.getCause().addSuppressed(ex.getCause()); + } + throw new CompletionException(ex.getCause()); + }); + } else { + throw new CompletionException(ex.getCause()); + } + } + return CompletableFuture.completedFuture(r); + }).thenCompose(f -> (CompletionStage) f); + return s; + } + }