diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 89c50ce34..5d529dbd5 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -523,10 +523,6 @@ public class CommandExecutorService implements CommandExecutor { TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (attemptPromise.isDone()) { - return; - } - attemptPromise.tryFailure(exceptionRef.get()); } }; @@ -554,18 +550,18 @@ public class CommandExecutorService implements CommandExecutor { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); - async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, 0); + async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); - async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, 0); + async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt); return; } if (future.cause() instanceof RedisLoadingException) { - async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, 0); + async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt); return; } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 7b5532fb7..edead0ce7 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -118,7 +118,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { // close all pub/sub connections while (true) { - RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection(); + RedisPubSubConnection connection = connectionEntry.pollSubscribeConnection(); if (connection == null) { break; } diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index 56b4a79a1..c8d2bcbd0 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -41,7 +41,7 @@ public class SubscribesConnectionEntry extends ConnectionEntry { return allSubscribeConnections; } - public RedisPubSubConnection pollFreeSubscribeConnection() { + public RedisPubSubConnection pollSubscribeConnection() { return freeSubscribeConnections.poll(); } diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 55793dac4..35c0dfbca 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -282,50 +282,51 @@ public class ConnectionPool { return; } final RedisConnection c = future.getNow(); - if (c.isActive()) { - Future f = c.asyncWithTimeout(null, RedisCommands.PING); - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - try { - if (entry.getFreezeReason() != FreezeReason.RECONNECT - || !entry.isFreezed()) { - return; - } - - if (future.isSuccess() && "PONG".equals(future.getNow())) { - entry.resetFailedAttempts(); - initConnections(entry, new Runnable() { - @Override - public void run() { - if (entry.getNodeType() == NodeType.SLAVE) { - handleQueue(entry, false); - masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); - } else { - synchronized (entry) { - if (entry.getFreezeReason() == FreezeReason.RECONNECT) { - handleQueue(entry, false); - - entry.setFreezed(false); - entry.setFreezeReason(null); - } + if (!c.isActive()) { + c.closeAsync(); + scheduleCheck(entry); + return; + } + + Future f = c.asyncWithTimeout(null, RedisCommands.PING); + f.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + try { + if (entry.getFreezeReason() != FreezeReason.RECONNECT + || !entry.isFreezed()) { + return; + } + + if (future.isSuccess() && "PONG".equals(future.getNow())) { + entry.resetFailedAttempts(); + initConnections(entry, new Runnable() { + @Override + public void run() { + if (entry.getNodeType() == NodeType.SLAVE) { + handleQueue(entry, false); + masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); + } else { + synchronized (entry) { + if (entry.getFreezeReason() == FreezeReason.RECONNECT) { + handleQueue(entry, false); + + entry.setFreezed(false); + entry.setFreezeReason(null); } } } - }, false); + } + }, false); - } else { - scheduleCheck(entry); - } - } finally { - c.closeAsync(); + } else { + scheduleCheck(entry); } + } finally { + c.closeAsync(); } - }); - } else { - c.closeAsync(); - scheduleCheck(entry); - } + } + }); } }); } diff --git a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java index 483c6cdbf..c273086a1 100644 --- a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java +++ b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java @@ -33,7 +33,7 @@ public class PubSubConnectionPoll extends ConnectionPool @Override protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) { - return entry.pollFreeSubscribeConnection(); + return entry.pollSubscribeConnection(); } @Override