From 466387764c6e4b8af3bbca689bb40ea8ab3b2848 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 26 Jul 2024 11:50:36 +0300 Subject: [PATCH] Fixed - RedissonShutdownException is thrown by RLock object #6029 --- .../main/java/org/redisson/RedissonBaseLock.java | 6 +++++- .../src/main/java/org/redisson/RedissonLock.java | 16 ++++++++++++++-- .../RedissonPermitExpirableSemaphore.java | 2 +- .../java/org/redisson/RedissonSemaphore.java | 2 +- .../org/redisson/connection/ServiceManager.java | 12 ++++++++---- 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index 92a10a5a4..220f2c51e 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -144,6 +144,10 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc CompletionStage future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { + if (e instanceof RedissonShutdownException) { + return; + } + if (e != null) { log.error("Can't update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); @@ -287,7 +291,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc @Override public RFuture unlockAsync(long threadId) { - return getServiceManager().execute(() -> unlockAsync0(threadId)); + return getServiceManager().execute(() -> unlockAsync0(threadId), null); } private RFuture unlockAsync0(long threadId) { diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 08c92b3ae..ab6cc8da1 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -110,6 +110,9 @@ public class RedissonLock extends RedissonBaseLock { if (ttl == null) { return; } + if (ttl == Long.MIN_VALUE) { + throw new InterruptedException(); + } CompletableFuture future = subscribe(threadId); pubSub.timeout(future); @@ -127,6 +130,9 @@ public class RedissonLock extends RedissonBaseLock { if (ttl == null) { break; } + if (ttl == Long.MIN_VALUE) { + throw new InterruptedException(); + } // waiting for message if (ttl >= 0) { @@ -157,7 +163,7 @@ public class RedissonLock extends RedissonBaseLock { } private RFuture tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) { - return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); + return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId), Long.MIN_VALUE); } private RFuture tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { @@ -233,6 +239,9 @@ public class RedissonLock extends RedissonBaseLock { long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); + if (ttl == Long.MIN_VALUE) { + return false; + } // lock acquired if (ttl == null) { return true; @@ -280,6 +289,9 @@ public class RedissonLock extends RedissonBaseLock { if (ttl == null) { return true; } + if (ttl == Long.MIN_VALUE) { + return false; + } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { @@ -447,7 +459,7 @@ public class RedissonLock extends RedissonBaseLock { @Override public RFuture tryLockAsync(long threadId) { - return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId)); + return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId), false); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 04a18e43c..7372ced20 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -371,7 +371,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen return getServiceManager().execute(() -> { RFuture> future = tryAcquireAsync(ids, timeoutDate); return commandExecutor.handleNoSync(future, () -> releaseAsync(ids)); - }); + }, Collections.emptyList()); } private RFuture> tryAcquireAsync(List ids, long timeoutDate) { diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 5a57080f3..9d4867c22 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -263,7 +263,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { return commandExecutor.getServiceManager().execute(() -> { RFuture future = tryAcquireAsync0(permits); return commandExecutor.handleNoSync(future, () -> releaseAsync(permits)); - }); + }, false); } private RFuture tryAcquireAsync0(int permits) { diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index b2c0a2ab4..fb1560f17 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -503,18 +503,22 @@ public final class ServiceManager { }); } - public RFuture execute(Supplier> supplier) { + public RFuture execute(Supplier> supplier, T defaultValue) { CompletableFuture result = new CompletableFuture<>(); int retryAttempts = config.getRetryAttempts(); AtomicInteger attempts = new AtomicInteger(retryAttempts); - execute(attempts, result, supplier); + execute(attempts, result, supplier, defaultValue); return new CompletableFutureWrapper<>(result); } - private void execute(AtomicInteger attempts, CompletableFuture result, Supplier> supplier) { + private void execute(AtomicInteger attempts, CompletableFuture result, Supplier> supplier, T defaultValue) { CompletionStage future = supplier.get(); future.whenComplete((r, e) -> { if (e != null) { + if (e instanceof RedissonShutdownException) { + result.complete(defaultValue); + return; + } if (e.getCause().getMessage() != null && e.getCause().getMessage().equals("None of slaves were synced")) { if (attempts.decrementAndGet() < 0) { @@ -522,7 +526,7 @@ public final class ServiceManager { return; } - newTimeout(t -> execute(attempts, result, supplier), + newTimeout(t -> execute(attempts, result, supplier, defaultValue), config.getRetryInterval(), TimeUnit.MILLISECONDS); return; }