From 4f1a5c3648ffcd6a5b3589a4df3ebe52c1647e9e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 8 Dec 2020 13:50:31 +0300 Subject: [PATCH] Fixed - slaves synchronization timeout isn't respected during RLock.lock() method invocation. #3253 --- .../main/java/org/redisson/RedissonLock.java | 18 +++++++++++++++--- .../command/RedisCommonBatchExecutor.java | 11 +++++++++-- .../org/redisson/command/RedisExecutor.java | 6 +++++- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 33b3a14a5..7f62dcb5b 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -18,6 +18,7 @@ package org.redisson; import io.netty.util.Timeout; import io.netty.util.TimerTask; import org.redisson.api.BatchOptions; +import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.client.RedisException; @@ -345,10 +346,21 @@ public class RedissonLock extends RedissonExpirable implements RLock { protected RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { CommandBatchService executorService = createCommandBatchService(); RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params); - if (!(commandExecutor instanceof CommandBatchService)) { - executorService.executeAsync(); + if (commandExecutor instanceof CommandBatchService) { + return result; } - return result; + + RPromise r = new RedissonPromise<>(); + RFuture> future = executorService.executeAsync(); + future.onComplete((res, ex) -> { + if (ex != null) { + r.tryFailure(ex); + return; + } + + r.trySuccess(result.getNow()); + }); + return r; } RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { diff --git a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java index 306200d64..0d7200329 100644 --- a/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisCommonBatchExecutor.java @@ -65,7 +65,9 @@ public class RedisCommonBatchExecutor extends RedisExecutor { if (options.getResponseTimeout() > 0) { this.responseTimeout = options.getResponseTimeout(); } - + if (options.getSyncSlaves() > 0) { + this.responseTimeout += options.getSyncTimeout(); + } } @Override @@ -114,7 +116,12 @@ public class RedisCommonBatchExecutor extends RedisExecutor { writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0)); } - + + @Override + protected boolean isResendAllowed(int attempt, int attempts) { + return options.getSyncSlaves() == 0 && super.isResendAllowed(attempt, attempts); + } + protected boolean isWaitCommand(CommandData c) { return c.getCommand().getName().equals(RedisCommands.WAIT.getName()); } diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index 2c9f7d6b8..40b922172 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -310,7 +310,7 @@ public class RedisExecutor { TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (attempt < attempts) { + if (isResendAllowed(attempt, attempts)) { if (!attemptPromise.cancel(false)) { return; } @@ -337,6 +337,10 @@ public class RedisExecutor { timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS); } + protected boolean isResendAllowed(int attempt, int attempts) { + return attempt < attempts; + } + private void handleBlockingOperations(RPromise attemptPromise, RedisConnection connection, Long popTimeout) { FutureListener listener = f -> { mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));