Fixed - no retry attempts are made for "None of slaves were synced" error. #4822

pull/4931/head
Nikita Koksharov 2 years ago
parent 1095441cf0
commit 65320f76fc

@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.function.Supplier;
/**
* Base class for implementing distributed locks
@ -316,14 +318,51 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return unlockAsync(threadId);
}
protected final <T> RFuture<T> execute(Supplier<RFuture<T>> supplier) {
CompletableFuture<T> result = new CompletableFuture<>();
int retryAttempts = commandExecutor.getServiceManager().getConfig().getRetryAttempts();
AtomicInteger attempts = new AtomicInteger(retryAttempts);
execute(attempts, result, supplier);
return new CompletableFutureWrapper<>(result);
}
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<RFuture<T>> supplier) {
RFuture<T> future = supplier.get();
future.whenComplete((r, e) -> {
if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) {
if (attempts.decrementAndGet() < 0) {
result.completeExceptionally(e);
return;
}
commandExecutor.getServiceManager().newTimeout(t -> execute(attempts, result, supplier),
commandExecutor.getServiceManager().getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
result.completeExceptionally(e);
return;
}
result.complete(r);
});
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RFuture<Boolean> future = unlockInnerAsync(threadId);
return execute(() -> unlockAsync0(threadId));
}
private RFuture<Void> unlockAsync0(long threadId) {
CompletionStage<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
if (e instanceof CompletionException) {
throw (CompletionException) e;
}
throw new CompletionException(e);
}
if (opStatus == null) {
@ -400,7 +439,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
}
protected <T> CompletionStage<T> handleNoSync(long threadId, RFuture<T> ttlRemainingFuture) {
protected <T> CompletionStage<T> handleNoSync(long threadId, CompletionStage<T> ttlRemainingFuture) {
CompletionStage<T> s = ttlRemainingFuture.handle((r, ex) -> {
if (ex != null) {
if (ex.getCause().getMessage().equals("None of slaves were synced")) {

@ -16,7 +16,6 @@
package org.redisson;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.LongCodec;
@ -141,11 +140,15 @@ public class RedissonLock extends RedissonBaseLock {
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
}
private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> acquiredFuture;
CompletionStage<Boolean> acquiredFuture;
if (leaseTime > 0) {
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
@ -153,8 +156,7 @@ public class RedissonLock extends RedissonBaseLock {
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
CompletionStage<Boolean> s = handleNoSync(threadId, acquiredFuture);
acquiredFuture = new CompletableFutureWrapper<>(s);
acquiredFuture = handleNoSync(threadId, acquiredFuture);
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// lock acquired
@ -346,7 +348,7 @@ public class RedissonLock extends RedissonBaseLock {
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long currentThreadId) {
CompletableFuture<Void> result = new CompletableFuture<>();
RFuture<Long> ttlFuture = tryAcquireAsync(-1, leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync0(-1, leaseTime, unit, currentThreadId);
ttlFuture.whenComplete((ttl, e) -> {
if (e != null) {
result.completeExceptionally(e);
@ -378,7 +380,7 @@ public class RedissonLock extends RedissonBaseLock {
private void lockAsync(long leaseTime, TimeUnit unit,
RedissonLockEntry entry, CompletableFuture<Void> result, long currentThreadId) {
RFuture<Long> ttlFuture = tryAcquireAsync(-1, leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync0(-1, leaseTime, unit, currentThreadId);
ttlFuture.whenComplete((ttl, e) -> {
if (e != null) {
unsubscribe(entry, currentThreadId);
@ -423,7 +425,7 @@ public class RedissonLock extends RedissonBaseLock {
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, -1, null, threadId);
return execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
}
@Override
@ -433,7 +435,7 @@ public class RedissonLock extends RedissonBaseLock {
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
long currentTime = System.currentTimeMillis();
RFuture<Long> ttlFuture = tryAcquireAsync(waitTime, leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync0(waitTime, leaseTime, unit, currentThreadId);
ttlFuture.whenComplete((ttl, e) -> {
if (e != null) {
result.completeExceptionally(e);
@ -457,7 +459,7 @@ public class RedissonLock extends RedissonBaseLock {
}
long current = System.currentTimeMillis();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
AtomicReference<Timeout> futureRef = new AtomicReference<>();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
pubSub.timeout(subscribeFuture, time.get());
subscribeFuture.whenComplete((r, ex) -> {
@ -476,13 +478,10 @@ public class RedissonLock extends RedissonBaseLock {
tryLockAsync(time, waitTime, leaseTime, unit, r, result, currentThreadId);
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
trySuccessFalse(currentThreadId, result);
}
Timeout scheduledFuture = commandExecutor.getServiceManager().newTimeout(timeout -> {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
trySuccessFalse(currentThreadId, result);
}
}, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
@ -507,7 +506,7 @@ public class RedissonLock extends RedissonBaseLock {
}
long curr = System.currentTimeMillis();
RFuture<Long> ttlFuture = tryAcquireAsync(waitTime, leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync0(waitTime, leaseTime, unit, currentThreadId);
ttlFuture.whenComplete((ttl, e) -> {
if (e != null) {
unsubscribe(entry, currentThreadId);

Loading…
Cancel
Save