diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index d6bb82ea8..b39470023 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -527,7 +527,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { .collect(Collectors.toList()); nonFailedSlaves.forEach(uri -> { if (entry.hasSlave(uri)) { - CompletableFuture f = entry.slaveUpAsync(uri, FreezeReason.MANAGER); + CompletableFuture f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER); f = f.thenCompose(v -> { if (v) { log.info("slave: {} is up for slot ranges: {}", uri, currentPart.getSlotRanges()); @@ -582,7 +582,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (RedisURI uri : addedSlaves) { ClientConnectionsEntry slaveEntry = entry.getEntry(uri); if (slaveEntry != null) { - CompletableFuture slaveUpFuture = entry.slaveUpAsync(uri, FreezeReason.MANAGER); + CompletableFuture slaveUpFuture = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER); slaveUpFuture = slaveUpFuture.thenCompose(v -> { if (v) { currentPart.addSlaveAddress(uri); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index cc2b5dfde..a0ac9e0a3 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -415,38 +415,26 @@ public class MasterSlaveEntry { return masterEntry.getClient(); } - public boolean slaveUp(ClientConnectionsEntry entry, FreezeReason freezeReason) { - if (!slaveBalancer.unfreeze(entry, freezeReason)) { - return false; - } - - InetSocketAddress addr = masterEntry.getClient().getAddr(); - // exclude master from slaves - if (!config.isSlaveNotUsed() - && !addr.equals(entry.getClient().getAddr())) { - if (slaveDown(addr, FreezeReason.SYSTEM)) { - log.info("master {} excluded from slaves", addr); - } - } + public CompletableFuture slaveUpAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) { noPubSubSlaves.set(false); - return true; + CompletableFuture f = slaveBalancer.unfreezeAsync(entry, freezeReason); + return f.thenCompose(r -> { + if (r) { + return excludeMasterFromSlaves(entry.getClient().getAddr()); + } + return CompletableFuture.completedFuture(r); + }); } - public boolean slaveUp(RedisURI address, FreezeReason freezeReason) { - if (!slaveBalancer.unfreeze(address, freezeReason)) { - return false; - } - - InetSocketAddress addr = masterEntry.getClient().getAddr(); - // exclude master from slaves - if (!config.isSlaveNotUsed() - && !address.equals(addr)) { - if (slaveDown(addr, FreezeReason.SYSTEM)) { - log.info("master {} excluded from slaves", addr); - } - } + public CompletableFuture slaveUpAsync(RedisURI address, FreezeReason freezeReason) { noPubSubSlaves.set(false); - return true; + CompletableFuture f = slaveBalancer.unfreezeAsync(address, freezeReason); + return f.thenCompose(r -> { + if (r) { + return excludeMasterFromSlaves(address); + } + return CompletableFuture.completedFuture(r); + }); } public CompletableFuture excludeMasterFromSlaves(RedisURI address) { @@ -463,7 +451,7 @@ public class MasterSlaveEntry { }); } - public CompletableFuture excludeMasterFromSlaves(InetSocketAddress address) { + private CompletableFuture excludeMasterFromSlaves(InetSocketAddress address) { InetSocketAddress addr = masterEntry.getClient().getAddr(); if (config.isSlaveNotUsed() || addr.equals(address)) { return CompletableFuture.completedFuture(false); @@ -477,7 +465,7 @@ public class MasterSlaveEntry { }); } - public CompletableFuture slaveUpAsync(RedisURI address, FreezeReason freezeReason) { + public CompletableFuture slaveUpNoMasterExclusionAsync(RedisURI address, FreezeReason freezeReason) { noPubSubSlaves.set(false); return slaveBalancer.unfreezeAsync(address, freezeReason); } @@ -539,7 +527,7 @@ public class MasterSlaveEntry { slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE); slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER); // freeze in slaveBalancer - slaveDown(oldMaster.getClient().getAddr(), FreezeReason.MANAGER); + slaveDownAsync(oldMaster.getClient().getAddr(), FreezeReason.MANAGER); // check if at least one slave is available, use master as slave if false if (!config.isSlaveNotUsed()) { diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 9b7e047ca..c93072494 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -594,7 +594,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); } - CompletableFuture f = entry.slaveUpAsync(uri, FreezeReason.MANAGER); + CompletableFuture f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER); return f.thenCompose(e -> { if (e) { log.info("slave: {} is up", uri); diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 3f13d2e56..90c6bec83 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -104,16 +104,6 @@ public class LoadBalancerManager { return count; } - public boolean unfreeze(RedisURI address, FreezeReason freezeReason) { - ClientConnectionsEntry entry = getEntry(address); - if (entry == null) { - log.error("Can't find {} in slaves! Available slaves: {}", address, client2Entry.keySet()); - return false; - } - - return unfreeze(entry, freezeReason); - } - public CompletableFuture unfreezeAsync(RedisURI address, FreezeReason freezeReason) { ClientConnectionsEntry entry = getEntry(address); if (entry == null) { @@ -181,7 +171,7 @@ public class LoadBalancerManager { return false; } - private CompletableFuture unfreezeAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) { + public CompletableFuture unfreezeAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) { return unfreezeAsync(entry, freezeReason, 0); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 874194bab..6ee65baae 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -361,9 +361,12 @@ abstract class ConnectionPool { } if ("PONG".equals(t)) { - if (masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT)) { - log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); - } + CompletableFuture ff = masterSlaveEntry.slaveUpAsync(entry, FreezeReason.RECONNECT); + ff.thenAccept(r -> { + if (r) { + log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); + } + }); } else { scheduleCheck(entry); }