refactoring

pull/4979/head
Nikita Koksharov 2 years ago
parent 595dcfb3c9
commit 9460b2556f

@ -318,40 +318,9 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return unlockAsync(threadId);
}
protected final <T> RFuture<T> execute(Supplier<RFuture<T>> supplier) {
CompletableFuture<T> result = new CompletableFuture<>();
int retryAttempts = commandExecutor.getServiceManager().getConfig().getRetryAttempts();
AtomicInteger attempts = new AtomicInteger(retryAttempts);
execute(attempts, result, supplier);
return new CompletableFutureWrapper<>(result);
}
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<RFuture<T>> supplier) {
RFuture<T> future = supplier.get();
future.whenComplete((r, e) -> {
if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) {
if (attempts.decrementAndGet() < 0) {
result.completeExceptionally(e);
return;
}
commandExecutor.getServiceManager().newTimeout(t -> execute(attempts, result, supplier),
commandExecutor.getServiceManager().getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
result.completeExceptionally(e);
return;
}
result.complete(r);
});
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
return execute(() -> unlockAsync0(threadId));
return commandExecutor.getServiceManager().execute(() -> unlockAsync0(threadId));
}
private RFuture<Void> unlockAsync0(long threadId) {
@ -439,26 +408,8 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
}
protected <T> CompletionStage<T> handleNoSync(long threadId, CompletionStage<T> ttlRemainingFuture) {
CompletionStage<T> s = ttlRemainingFuture.handle((r, ex) -> {
if (ex != null) {
if (ex.getCause().getMessage().equals("None of slaves were synced")) {
return unlockInnerAsync(threadId).handle((r1, e) -> {
if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) {
throw new CompletionException(ex.getCause());
}
e.getCause().addSuppressed(ex.getCause());
}
throw new CompletionException(ex.getCause());
});
} else {
throw new CompletionException(ex.getCause());
}
}
return CompletableFuture.completedFuture(r);
}).thenCompose(f -> (CompletionStage<T>) f);
return s;
protected final <T> CompletionStage<T> handleNoSync(long threadId, CompletionStage<T> ttlRemainingFuture) {
return commandExecutor.getServiceManager().handleNoSync(ttlRemainingFuture, () -> unlockInnerAsync(threadId));
}
}

@ -144,7 +144,7 @@ public class RedissonLock extends RedissonBaseLock {
}
private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
return commandExecutor.getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
@ -425,7 +425,7 @@ public class RedissonLock extends RedissonBaseLock {
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
return commandExecutor.getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
}
@Override

@ -40,11 +40,13 @@ import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService;
import org.redisson.Version;
import org.redisson.api.NatMapper;
import org.redisson.api.RFuture;
import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.TransportMode;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RedisURI;
import org.slf4j.Logger;
@ -56,6 +58,8 @@ import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
/**
*
@ -373,4 +377,57 @@ public class ServiceManager {
});
}
public <T> RFuture<T> execute(Supplier<RFuture<T>> supplier) {
CompletableFuture<T> result = new CompletableFuture<>();
int retryAttempts = config.getRetryAttempts();
AtomicInteger attempts = new AtomicInteger(retryAttempts);
execute(attempts, result, supplier);
return new CompletableFutureWrapper<>(result);
}
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<RFuture<T>> supplier) {
RFuture<T> future = supplier.get();
future.whenComplete((r, e) -> {
if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) {
if (attempts.decrementAndGet() < 0) {
result.completeExceptionally(e);
return;
}
newTimeout(t -> execute(attempts, result, supplier),
config.getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
result.completeExceptionally(e);
return;
}
result.complete(r);
});
}
public <T> CompletionStage<T> handleNoSync(CompletionStage<T> stage, Supplier<CompletionStage<?>> supplier) {
CompletionStage<T> s = stage.handle((r, ex) -> {
if (ex != null) {
if (ex.getCause().getMessage().equals("None of slaves were synced")) {
return supplier.get().handle((r1, e) -> {
if (e != null) {
if (e.getCause().getMessage().equals("None of slaves were synced")) {
throw new CompletionException(ex.getCause());
}
e.getCause().addSuppressed(ex.getCause());
}
throw new CompletionException(ex.getCause());
});
} else {
throw new CompletionException(ex.getCause());
}
}
return CompletableFuture.completedFuture(r);
}).thenCompose(f -> (CompletionStage<T>) f);
return s;
}
}

Loading…
Cancel
Save