Fixed - RedissonShutdownException is thrown by RLock object

pull/6077/head
Nikita Koksharov
parent 48b3b085ff
commit 466387764c

@ -144,6 +144,10 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e instanceof RedissonShutdownException) {
return;
}
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
@ -287,7 +291,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
@Override
public RFuture<Void> unlockAsync(long threadId) {
return getServiceManager().execute(() -> unlockAsync0(threadId));
return getServiceManager().execute(() -> unlockAsync0(threadId), null);
}
private RFuture<Void> unlockAsync0(long threadId) {

@ -110,6 +110,9 @@ public class RedissonLock extends RedissonBaseLock {
if (ttl == null) {
return;
}
if (ttl == Long.MIN_VALUE) {
throw new InterruptedException();
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
@ -127,6 +130,9 @@ public class RedissonLock extends RedissonBaseLock {
if (ttl == null) {
break;
}
if (ttl == Long.MIN_VALUE) {
throw new InterruptedException();
}
// waiting for message
if (ttl >= 0) {
@ -157,7 +163,7 @@ public class RedissonLock extends RedissonBaseLock {
}
private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId), Long.MIN_VALUE);
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
@ -233,6 +239,9 @@ public class RedissonLock extends RedissonBaseLock {
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == Long.MIN_VALUE) {
return false;
}
// lock acquired
if (ttl == null) {
return true;
@ -280,6 +289,9 @@ public class RedissonLock extends RedissonBaseLock {
if (ttl == null) {
return true;
}
if (ttl == Long.MIN_VALUE) {
return false;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
@ -447,7 +459,7 @@ public class RedissonLock extends RedissonBaseLock {
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId), false);
}
@Override

@ -371,7 +371,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return getServiceManager().execute(() -> {
RFuture<List<String>> future = tryAcquireAsync(ids, timeoutDate);
return commandExecutor.handleNoSync(future, () -> releaseAsync(ids));
});
}, Collections.emptyList());
}
private RFuture<List<String>> tryAcquireAsync(List<String> ids, long timeoutDate) {

@ -263,7 +263,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return commandExecutor.getServiceManager().execute(() -> {
RFuture<Boolean> future = tryAcquireAsync0(permits);
return commandExecutor.handleNoSync(future, () -> releaseAsync(permits));
});
}, false);
}
private RFuture<Boolean> tryAcquireAsync0(int permits) {

@ -503,18 +503,22 @@ public final class ServiceManager {
});
}
public <T> RFuture<T> execute(Supplier<CompletionStage<T>> supplier) {
public <T> RFuture<T> execute(Supplier<CompletionStage<T>> supplier, T defaultValue) {
CompletableFuture<T> result = new CompletableFuture<>();
int retryAttempts = config.getRetryAttempts();
AtomicInteger attempts = new AtomicInteger(retryAttempts);
execute(attempts, result, supplier);
execute(attempts, result, supplier, defaultValue);
return new CompletableFutureWrapper<>(result);
}
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<CompletionStage<T>> supplier) {
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<CompletionStage<T>> supplier, T defaultValue) {
CompletionStage<T> future = supplier.get();
future.whenComplete((r, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
result.complete(defaultValue);
return;
}
if (e.getCause().getMessage() != null
&& e.getCause().getMessage().equals("None of slaves were synced")) {
if (attempts.decrementAndGet() < 0) {
@ -522,7 +526,7 @@ public final class ServiceManager {
return;
}
newTimeout(t -> execute(attempts, result, supplier),
newTimeout(t -> execute(attempts, result, supplier, defaultValue),
config.getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}

Loading…
Cancel
Save