Fixed - connection isn't reconnected on WRONGPASS Redis error #4713

pull/5564/head
Nikita Koksharov 1 year ago
parent 511b3a6b68
commit 60faed5f10

@ -46,10 +46,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
@ -78,6 +75,7 @@ public class RedisExecutor<V, R> {
final int responseTimeout;
CompletableFuture<RedisConnection> connectionFuture;
boolean reuseConnection;
NodeSource source;
MasterSlaveEntry entry;
Codec codec;
@ -502,6 +500,20 @@ public class RedisExecutor<V, R> {
mainPromiseListener = null;
Throwable cause = cause(attemptFuture);
if (cause instanceof RedisWrongPasswordException) {
if (attempt < attempts) {
onException();
reuseConnection = true;
CompletionStage<Void> f = connectionFuture.join().forceFastReconnectAsync();
f.thenAccept(v -> {
attempt++;
execute();
});
return;
}
}
if (cause instanceof RedisMovedException && !ignoreRedirect) {
RedisMovedException ex = (RedisMovedException) cause;
if (source.getRedirect() == Redirect.MOVED
@ -646,6 +658,12 @@ public class RedisExecutor<V, R> {
return;
}
Throwable cause = cause(attemptPromise);
if (cause instanceof RedisWrongPasswordException
&& attempt < attempts) {
return;
}
RedisConnection connection = getNow(connectionFuture);
connectionManager.getServiceManager().getShutdownLatch().release();
if (connectionManager.getServiceManager().getConfig().getMasterConnectionPoolSize() < 10) {
@ -682,6 +700,10 @@ public class RedisExecutor<V, R> {
}
protected CompletableFuture<RedisConnection> getConnection(CompletableFuture<R> attemptPromise) {
if (reuseConnection) {
reuseConnection = false;
return connectionFuture;
}
if (readOnlyMode) {
connectionFuture = connectionReadOp(command, attemptPromise);
} else {

@ -218,11 +218,13 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
protected CompletableFuture<RedisConnection> getConnection(CompletableFuture<R> attemptPromise) {
MasterSlaveEntry msEntry = getEntry();
ConnectionEntry entry = connections.computeIfAbsent(msEntry, k -> {
if (!reuseConnection) {
if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
connectionFuture = connectionWriteOp(null, attemptPromise);
} else {
connectionFuture = connectionReadOp(null, attemptPromise);
}
}
ConnectionEntry ce = new ConnectionEntry(connectionFuture);
ce.setCancelCallback(() -> {

Loading…
Cancel
Save