diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index 99b232d3f..124a0e6f8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -318,40 +318,9 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc return unlockAsync(threadId); } - protected final <T> RFuture<T> execute(Supplier<RFuture<T>> supplier) { - CompletableFuture<T> result = new CompletableFuture<>(); - int retryAttempts = commandExecutor.getServiceManager().getConfig().getRetryAttempts(); - AtomicInteger attempts = new AtomicInteger(retryAttempts); - execute(attempts, result, supplier); - return new CompletableFutureWrapper<>(result); - } - - private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<RFuture<T>> supplier) { - RFuture<T> future = supplier.get(); - future.whenComplete((r, e) -> { - if (e != null) { - if (e.getCause().getMessage().equals("None of slaves were synced")) { - if (attempts.decrementAndGet() < 0) { - result.completeExceptionally(e); - return; - } - - commandExecutor.getServiceManager().newTimeout(t -> execute(attempts, result, supplier), - commandExecutor.getServiceManager().getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - return; - } - - result.completeExceptionally(e); - return; - } - - result.complete(r); - }); - } - @Override public RFuture<Void> unlockAsync(long threadId) { - return execute(() -> unlockAsync0(threadId)); + return commandExecutor.getServiceManager().execute(() -> unlockAsync0(threadId)); } private RFuture<Void> unlockAsync0(long threadId) { @@ -439,26 +408,8 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc return tryLockAsync(waitTime, leaseTime, unit, currentThreadId); } - protected <T> CompletionStage<T> handleNoSync(long threadId, CompletionStage<T> ttlRemainingFuture) { - CompletionStage<T> s = ttlRemainingFuture.handle((r, ex) -> { - if (ex != null) { - if (ex.getCause().getMessage().equals("None of slaves were synced")) { - return unlockInnerAsync(threadId).handle((r1, e) -> { - if (e != null) { - if (e.getCause().getMessage().equals("None of slaves were synced")) { - throw new CompletionException(ex.getCause()); - } - e.getCause().addSuppressed(ex.getCause()); - } - throw new CompletionException(ex.getCause()); - }); - } else { - throw new CompletionException(ex.getCause()); - } - } - return CompletableFuture.completedFuture(r); - }).thenCompose(f -> (CompletionStage<T>) f); - return s; + protected final <T> CompletionStage<T> handleNoSync(long threadId, CompletionStage<T> ttlRemainingFuture) { + return commandExecutor.getServiceManager().handleNoSync(ttlRemainingFuture, () -> unlockInnerAsync(threadId)); } } diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index bfa9a4d69..bfe69f965 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -144,7 +144,7 @@ public class RedissonLock extends RedissonBaseLock { } private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) { - return execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); + return commandExecutor.getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { @@ -425,7 +425,7 @@ public class RedissonLock extends RedissonBaseLock { @Override public RFuture<Boolean> tryLockAsync(long threadId) { - return execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId)); + return commandExecutor.getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId)); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 972866f46..3f8f8b0e9 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -40,11 +40,13 @@ import io.netty.util.internal.PlatformDependent; import org.redisson.ElementsSubscribeService; import org.redisson.Version; import org.redisson.api.NatMapper; +import org.redisson.api.RFuture; import org.redisson.cache.LRUCacheMap; import org.redisson.client.RedisNodeNotFoundException; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.TransportMode; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RedisURI; import org.slf4j.Logger; @@ -56,6 +58,8 @@ import java.net.UnknownHostException; import java.security.MessageDigest; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; /** * @@ -373,4 +377,57 @@ public class ServiceManager { }); } + public <T> RFuture<T> execute(Supplier<RFuture<T>> supplier) { + CompletableFuture<T> result = new CompletableFuture<>(); + int retryAttempts = config.getRetryAttempts(); + AtomicInteger attempts = new AtomicInteger(retryAttempts); + execute(attempts, result, supplier); + return new CompletableFutureWrapper<>(result); + } + + private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<RFuture<T>> supplier) { + RFuture<T> future = supplier.get(); + future.whenComplete((r, e) -> { + if (e != null) { + if (e.getCause().getMessage().equals("None of slaves were synced")) { + if (attempts.decrementAndGet() < 0) { + result.completeExceptionally(e); + return; + } + + newTimeout(t -> execute(attempts, result, supplier), + config.getRetryInterval(), TimeUnit.MILLISECONDS); + return; + } + + result.completeExceptionally(e); + return; + } + + result.complete(r); + }); + } + + public <T> CompletionStage<T> handleNoSync(CompletionStage<T> stage, Supplier<CompletionStage<?>> supplier) { + CompletionStage<T> s = stage.handle((r, ex) -> { + if (ex != null) { + if (ex.getCause().getMessage().equals("None of slaves were synced")) { + return supplier.get().handle((r1, e) -> { + if (e != null) { + if (e.getCause().getMessage().equals("None of slaves were synced")) { + throw new CompletionException(ex.getCause()); + } + e.getCause().addSuppressed(ex.getCause()); + } + throw new CompletionException(ex.getCause()); + }); + } else { + throw new CompletionException(ex.getCause()); + } + } + return CompletableFuture.completedFuture(r); + }).thenCompose(f -> (CompletionStage<T>) f); + return s; + } + }