|
|
|
@ -21,7 +21,6 @@ import org.redisson.api.BatchOptions;
|
|
|
|
|
import org.redisson.api.BatchResult;
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.api.RLock;
|
|
|
|
|
import org.redisson.client.RedisClusterDownException;
|
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.codec.LongCodec;
|
|
|
|
@ -31,7 +30,6 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
|
|
|
|
|
import org.redisson.client.protocol.decoder.MapValueDecoder;
|
|
|
|
|
import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.command.CommandBatchService;
|
|
|
|
|
import org.redisson.connection.MasterSlaveEntry;
|
|
|
|
|
import org.redisson.misc.CompletableFutureWrapper;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
@ -208,14 +206,13 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
|
|
|
|
|
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
|
|
|
|
|
|
|
|
|
|
CompletionStage<Map<String, String>> replicationFuture = CompletableFuture.completedFuture(Collections.emptyMap());
|
|
|
|
|
if (!(commandExecutor instanceof CommandBatchService) && entry != null && entry.getAvailableSlaves() > 0) {
|
|
|
|
|
replicationFuture = commandExecutor.writeAsync(entry, null, RedisCommands.INFO_REPLICATION);
|
|
|
|
|
if (!(commandExecutor instanceof CommandBatchService)
|
|
|
|
|
&& !commandExecutor.getServiceManager().getConfig().checkSkipSlavesInit()) {
|
|
|
|
|
replicationFuture = commandExecutor.writeAsync(getRawName(), RedisCommands.INFO_REPLICATION);
|
|
|
|
|
}
|
|
|
|
|
CompletionStage<T> resFuture = replicationFuture.thenCompose(r -> {
|
|
|
|
|
Integer availableSlaves = Integer.valueOf(r.getOrDefault("connected_slaves", "0"));
|
|
|
|
|
int availableSlaves = Integer.parseInt(r.getOrDefault("connected_slaves", "0"));
|
|
|
|
|
|
|
|
|
|
CommandBatchService executorService = createCommandBatchService(availableSlaves);
|
|
|
|
|
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
|
|
|
|
@ -406,12 +403,10 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
|
|
|
|
|
protected <T> CompletionStage<T> handleNoSync(long threadId, RFuture<T> ttlRemainingFuture) {
|
|
|
|
|
CompletionStage<T> s = ttlRemainingFuture.handle((r, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
if (ex.getCause().getMessage().equals("None of slaves were synced")
|
|
|
|
|
|| ex.getCause() instanceof RedisClusterDownException) {
|
|
|
|
|
if (ex.getCause().getMessage().equals("None of slaves were synced")) {
|
|
|
|
|
return unlockInnerAsync(threadId).handle((r1, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
if (e.getCause().getMessage().equals("None of slaves were synced")
|
|
|
|
|
|| ex.getCause() instanceof RedisClusterDownException) {
|
|
|
|
|
if (e.getCause().getMessage().equals("None of slaves were synced")) {
|
|
|
|
|
throw new CompletionException(ex.getCause());
|
|
|
|
|
}
|
|
|
|
|
e.getCause().addSuppressed(ex.getCause());
|
|
|
|
|