From 60faed5f10e31d3d0168ded068f3f43962333be0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 6 Jan 2024 09:54:44 +0300 Subject: [PATCH] Fixed - connection isn't reconnected on WRONGPASS Redis error #4713 --- .../org/redisson/command/RedisExecutor.java | 30 ++++++++++++++++--- .../command/RedisQueuedBatchExecutor.java | 10 ++++--- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index f94fdd2b3..be68d6476 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -46,10 +46,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.BiConsumer; /** @@ -78,6 +75,7 @@ public class RedisExecutor { final int responseTimeout; CompletableFuture connectionFuture; + boolean reuseConnection; NodeSource source; MasterSlaveEntry entry; Codec codec; @@ -502,6 +500,20 @@ public class RedisExecutor { mainPromiseListener = null; Throwable cause = cause(attemptFuture); + if (cause instanceof RedisWrongPasswordException) { + if (attempt < attempts) { + onException(); + + reuseConnection = true; + CompletionStage f = connectionFuture.join().forceFastReconnectAsync(); + f.thenAccept(v -> { + attempt++; + execute(); + }); + return; + } + } + if (cause instanceof RedisMovedException && !ignoreRedirect) { RedisMovedException ex = (RedisMovedException) cause; if (source.getRedirect() == Redirect.MOVED @@ -646,6 +658,12 @@ public class RedisExecutor { return; } + Throwable cause = cause(attemptPromise); + if (cause instanceof RedisWrongPasswordException + && attempt < attempts) { + return; + } + RedisConnection connection = getNow(connectionFuture); connectionManager.getServiceManager().getShutdownLatch().release(); if (connectionManager.getServiceManager().getConfig().getMasterConnectionPoolSize() < 10) { @@ -682,6 +700,10 @@ public class RedisExecutor { } protected CompletableFuture getConnection(CompletableFuture attemptPromise) { + if (reuseConnection) { + reuseConnection = false; + return connectionFuture; + } if (readOnlyMode) { connectionFuture = connectionReadOp(command, attemptPromise); } else { diff --git a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java index 11c7627cd..903030e03 100644 --- a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java @@ -218,10 +218,12 @@ public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor protected CompletableFuture getConnection(CompletableFuture attemptPromise) { MasterSlaveEntry msEntry = getEntry(); ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> { - if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { - connectionFuture = connectionWriteOp(null, attemptPromise); - } else { - connectionFuture = connectionReadOp(null, attemptPromise); + if (!reuseConnection) { + if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { + connectionFuture = connectionWriteOp(null, attemptPromise); + } else { + connectionFuture = connectionReadOp(null, attemptPromise); + } } ConnectionEntry ce = new ConnectionEntry(connectionFuture);