Fixed - slaves synchronization timeout isn't respected during RLock.lock() method invocation. #3253

pull/3292/head
Nikita Koksharov 4 years ago
parent c9f7883113
commit 4f1a5c3648

@ -18,6 +18,7 @@ package org.redisson;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.RedisException;
@ -345,10 +346,21 @@ public class RedissonLock extends RedissonExpirable implements RLock {
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CommandBatchService executorService = createCommandBatchService();
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (!(commandExecutor instanceof CommandBatchService)) {
executorService.executeAsync();
if (commandExecutor instanceof CommandBatchService) {
return result;
}
return result;
RPromise<T> r = new RedissonPromise<>();
RFuture<BatchResult<?>> future = executorService.executeAsync();
future.onComplete((res, ex) -> {
if (ex != null) {
r.tryFailure(ex);
return;
}
r.trySuccess(result.getNow());
});
return r;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {

@ -65,7 +65,9 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
if (options.getResponseTimeout() > 0) {
this.responseTimeout = options.getResponseTimeout();
}
if (options.getSyncSlaves() > 0) {
this.responseTimeout += options.getSyncTimeout();
}
}
@Override
@ -114,7 +116,12 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));
}
@Override
protected boolean isResendAllowed(int attempt, int attempts) {
return options.getSyncSlaves() == 0 && super.isResendAllowed(attempt, attempts);
}
protected boolean isWaitCommand(CommandData<?, ?> c) {
return c.getCommand().getName().equals(RedisCommands.WAIT.getName());
}

@ -310,7 +310,7 @@ public class RedisExecutor<V, R> {
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attempt < attempts) {
if (isResendAllowed(attempt, attempts)) {
if (!attemptPromise.cancel(false)) {
return;
}
@ -337,6 +337,10 @@ public class RedisExecutor<V, R> {
timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
}
protected boolean isResendAllowed(int attempt, int attempts) {
return attempt < attempts;
}
private void handleBlockingOperations(RPromise<R> attemptPromise, RedisConnection connection, Long popTimeout) {
FutureListener<Void> listener = f -> {
mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));

Loading…
Cancel
Save