diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 27ab8f3ed..f57cc253f 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -110,27 +110,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { lastClusterNode = addr; Collection partitions = parsePartitions(nodes); - List>>> futures = new ArrayList>>>(); + List> masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { failedMasters.add(partition.getMasterAddress().toString()); continue; } - RFuture>> masterFuture = addMasterEntry(partition, cfg); - futures.add(masterFuture); + RFuture masterFuture = addMasterEntry(partition, cfg); + masterFutures.add(masterFuture); } - for (RFuture>> masterFuture : futures) { + for (RFuture masterFuture : masterFutures) { masterFuture.awaitUninterruptibly(); if (!masterFuture.isSuccess()) { lastException = masterFuture.cause(); - continue; - } - for (RFuture future : masterFuture.getNow()) { - future.awaitUninterruptibly(); - if (!future.isSuccess()) { - lastException = future.cause(); - } } } break; @@ -168,7 +161,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - private RFuture>> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { + private RFuture addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { if (partition.isMasterFail()) { RedisException e = new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: " + @@ -181,7 +174,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return RedissonPromise.newFailedFuture(e); } - RPromise>> result = new RedissonPromise>>(); + RPromise result = new RedissonPromise<>(); RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName); connectionFuture.onComplete((connection, ex1) -> { if (ex1 != null) { @@ -193,49 +186,64 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { MasterSlaveServersConfig config = create(cfg); config.setMasterAddress(partition.getMasterAddress().toString()); - MasterSlaveEntry e; - List> futures = new ArrayList>(); + MasterSlaveEntry entry; if (config.checkSkipSlavesInit()) { - e = new SingleEntry(ClusterConnectionManager.this, config); + entry = new SingleEntry(ClusterConnectionManager.this, config); } else { Set slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet()); config.setSlaveAddresses(slaveAddresses); - e = new MasterSlaveEntry(ClusterConnectionManager.this, config); - - List> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses()); - futures.addAll(fs); - if (!partition.getSlaveAddresses().isEmpty()) { - log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); - if (!partition.getFailedSlaveAddresses().isEmpty()) { - log.warn("slaves: {} is down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges()); - } - } + entry = new MasterSlaveEntry(ClusterConnectionManager.this, config); } - RFuture f = e.setupMasterEntry(new RedisURI(config.getMasterAddress())); - RPromise initFuture = new RedissonPromise(); - futures.add(initFuture); - f.onComplete((res, ex3) -> { + RFuture f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress())); + f.onComplete((masterClient, ex3) -> { if (ex3 != null) { log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3); - initFuture.tryFailure(ex3); + result.tryFailure(ex3); return; } for (Integer slot : partition.getSlots()) { - addEntry(slot, e); + addEntry(slot, entry); lastPartitions.put(slot, partition); } - log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); - if (!initFuture.trySuccess(null)) { - throw new IllegalStateException(); + if (!config.checkSkipSlavesInit()) { + List> fs = entry.initSlaveBalancer(partition.getFailedSlaveAddresses(), masterClient); + AtomicInteger counter = new AtomicInteger(fs.size()); + for (RFuture future : fs) { + future.onComplete((r, ex) -> { + if (ex != null) { + log.error("unable to add slave for: " + partition.getMasterAddress() + + " slot ranges: " + partition.getSlotRanges(), ex); + } + + if (counter.decrementAndGet() == 0) { + if (!partition.getSlaveAddresses().isEmpty()) { + log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); + if (!partition.getFailedSlaveAddresses().isEmpty()) { + log.warn("slaves: {} are down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges()); + } + } + + if (result.trySuccess(null)) { + log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + } else { + log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + } + } + }); + } + } else { + if (result.trySuccess(null)) { + log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + } else { + log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + } } + }); - if (!result.trySuccess(futures)) { - throw new IllegalStateException(); - } }); return result; @@ -385,7 +393,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (RedisURI uri : aliveSlaves) { currentPart.removeFailedSlaveAddress(uri); if (entry.hasSlave(uri) && entry.slaveUp(uri, FreezeReason.MANAGER)) { - log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); + log.info("slave: {} is up for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -495,30 +503,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return RedissonPromise.newSucceededFuture(null); } - RPromise result = new RedissonPromise(); + RPromise result = new RedissonPromise<>(); AtomicInteger masters = new AtomicInteger(addedPartitions.size()); - Queue> futures = new ConcurrentLinkedQueue>(); for (ClusterPartition newPart : addedPartitions.values()) { - RFuture>> future = addMasterEntry(newPart, cfg); + RFuture future = addMasterEntry(newPart, cfg); future.onComplete((res, e) -> { - if (e == null) { - futures.addAll(res); - } - if (masters.decrementAndGet() == 0) { - if (futures.isEmpty()) { - result.trySuccess(null); - return; - } - - AtomicInteger nodes = new AtomicInteger(futures.size()); - for (RFuture nodeFuture : futures) { - nodeFuture.onComplete((r, ex) -> { - if (nodes.decrementAndGet() == 0) { - result.trySuccess(null); - } - }); - } + result.trySuccess(null); } }); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 877424fc3..73124c2be 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -16,13 +16,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -363,16 +357,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (config.checkSkipSlavesInit()) { entry = new SingleEntry(this, config); } else { - entry = createMasterSlaveEntry(config); + entry = new MasterSlaveEntry(this, config); } - RFuture f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress())); - f.syncUninterruptibly(); - + RFuture masterFuture = entry.setupMasterEntry(new RedisURI(config.getMasterAddress())); + masterFuture.syncUninterruptibly(); + + if (!config.checkSkipSlavesInit()) { + List> fs = entry.initSlaveBalancer(getDisconnectedNodes(), masterFuture.getNow()); + for (RFuture future : fs) { + future.syncUninterruptibly(); + } + } + for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { addEntry(slot, entry); } - startDNSMonitoring(f.getNow()); + startDNSMonitoring(masterFuture.getNow()); } catch (Exception e) { stopThreads(); throw e; @@ -387,16 +388,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { dnsMonitor.start(); } } - - protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) { - MasterSlaveEntry entry = new MasterSlaveEntry(this, config); - List> fs = entry.initSlaveBalancer(java.util.Collections.emptySet()); - for (RFuture future : fs) { - future.syncUninterruptibly(); - } - return entry; - } + protected Collection getDisconnectedNodes() { + return Collections.emptySet(); + } + protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig cfg) { MasterSlaveServersConfig c = new MasterSlaveServersConfig(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index f95395f48..5dc800071 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -85,13 +85,13 @@ public class MasterSlaveEntry { return config; } - public List> initSlaveBalancer(Collection disconnectedNodes) { + public List> initSlaveBalancer(Collection disconnectedNodes, RedisClient master) { boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && !config.checkSkipSlavesInit() && disconnectedNodes.size() < config.getSlaveAddresses().size(); List> result = new LinkedList>(); - RFuture f = addSlave(new RedisURI(config.getMasterAddress()), freezeMasterAsSlave, NodeType.MASTER); + RFuture f = addSlave(master.getAddr(), master.getConfig().getAddress(), freezeMasterAsSlave, NodeType.MASTER); result.add(f); for (String address : config.getSlaveAddresses()) { RedisURI uri = new RedisURI(address); @@ -456,7 +456,7 @@ public class MasterSlaveEntry { synchronized (oldMaster) { oldMaster.setFreezeReason(FreezeReason.MANAGER); } - slaveDown(oldMaster); + nodeDown(oldMaster); slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE); slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index e6029f42b..875f10b5b 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -498,7 +498,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (sentinel != null) { disconnectNode(sentinel); sentinel.shutdownAsync(); - log.warn("sentinel: {} was down", uri); + log.warn("sentinel: {} is down", uri); } } } @@ -509,13 +509,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } @Override - protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) { - MasterSlaveEntry entry = new MasterSlaveEntry(this, config); - List> fs = entry.initSlaveBalancer(disconnectedSlaves); - for (RFuture future : fs) { - future.syncUninterruptibly(); - } - return entry; + protected Collection getDisconnectedNodes() { + return disconnectedSlaves; } private RFuture registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) { @@ -599,11 +594,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private void slaveDown(RedisURI uri) { if (config.checkSkipSlavesInit()) { - log.warn("slave: {} was down", uri); + log.warn("slave: {} is down", uri); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); if (entry.slaveDown(uri, FreezeReason.MANAGER)) { - log.warn("slave: {} was down", uri); + log.warn("slave: {} is down", uri); } } } @@ -620,12 +615,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private void slaveUp(RedisURI uri) { if (config.checkSkipSlavesInit()) { - log.info("slave: {} has up", uri); + log.info("slave: {} is up", uri); return; } if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) { - log.info("slave: {} has up", uri); + log.info("slave: {} is up", uri); } } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 00ca07936..fe0fe64c0 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -115,16 +115,16 @@ public class LoadBalancerManager { public boolean unfreeze(RedisURI address, FreezeReason freezeReason) { ClientConnectionsEntry entry = getEntry(address); if (entry == null) { - throw new IllegalStateException("Can't find " + address + " in slaves!"); + throw new IllegalStateException("Can't find " + address + " in slaves! Available slaves: " + client2Entry.keySet()); } return unfreeze(entry, freezeReason); } - public boolean unfreeze(InetSocketAddress address, FreezeReason freezeReason) { + public boolean unfreeze(InetSocketAddress address, FreezeReason freezeReason) { ClientConnectionsEntry entry = getEntry(address); if (entry == null) { - throw new IllegalStateException("Can't find " + address + " in slaves!"); + throw new IllegalStateException("Can't find " + address + " in slaves! Available slaves: " + client2Entry.keySet()); } return unfreeze(entry, freezeReason);