diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index dbcbd6d8f..0e0dc7ad0 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -153,8 +153,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private Future>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) { if (partition.isMasterFail()) { RedisException e = new RedisException("Failed to add master: " + - partition.getMasterAddress() + " for slot ranges: " + - partition.getSlotRanges() + ". Reason - server has FAIL flag"); + partition.getMasterAddress() + " for slot ranges: " + + partition.getSlotRanges() + ". Reason - server has FAIL flag"); + + if (partition.getSlotRanges().isEmpty()) { + e = new RedisException("Failed to add master: " + + partition.getMasterAddress() + ". Reason - server has FAIL flag"); + } return newFailedFuture(e); } @@ -200,11 +205,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { config.setSlaveAddresses(partition.getSlaveAddresses()); e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); - List> fs = e.initSlaveBalancer(); - futures.addAll(fs); if (!partition.getSlaveAddresses().isEmpty()) { + List> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses()); + futures.addAll(fs); log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); + if (!partition.getFailedSlaveAddresses().isEmpty()) { + log.warn("slaves: {} is down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges()); + } } } @@ -242,8 +250,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List slaves = new ArrayList(); AtomicReference lastException = new AtomicReference(); for (ClusterPartition partition : lastPartitions.values()) { - nodes.add(partition.getMasterAddress()); - slaves.addAll(partition.getSlaveAddresses()); + if (!partition.isMasterFail()) { + nodes.add(partition.getMasterAddress()); + } + + Set partitionSlaves = new HashSet(partition.getSlaveAddresses()); + partitionSlaves.removeAll(partition.getFailedSlaveAddresses()); + slaves.addAll(partitionSlaves); } // master nodes first nodes.addAll(slaves); @@ -302,43 +315,69 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private void checkSlaveNodesChange(Collection newPartitions) { for (ClusterPartition newPart : newPartitions) { - for (final ClusterPartition currentPart : lastPartitions.values()) { + for (ClusterPartition currentPart : lastPartitions.values()) { if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { continue; } - final MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); - Set removedSlaves = new HashSet(currentPart.getSlaveAddresses()); - removedSlaves.removeAll(newPart.getSlaveAddresses()); + MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); + // should be invoked first in order to removed stale failedSlaveAddresses + addRemoveSlaves(entry, currentPart, newPart); + // Does some slaves change failed state to alive? + upDownSlaves(entry, currentPart, newPart); - for (URI uri : removedSlaves) { - currentPart.removeSlaveAddress(uri); + break; + } + } + } - slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); - log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); - } + private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) { + Set aliveSlaves = new HashSet(currentPart.getFailedSlaveAddresses()); + aliveSlaves.removeAll(newPart.getFailedSlaveAddresses()); + for (URI uri : aliveSlaves) { + currentPart.removeFailedSlaveAddress(uri); + if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + } - Set addedSlaves = new HashSet(newPart.getSlaveAddresses()); - addedSlaves.removeAll(currentPart.getSlaveAddresses()); - for (final URI uri : addedSlaves) { - Future future = entry.addSlave(uri.getHost(), uri.getPort()); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't add slave: " + uri, future.cause()); - return; - } + Set failedSlaves = new HashSet(newPart.getFailedSlaveAddresses()); + failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); + for (URI uri : failedSlaves) { + currentPart.addFailedSlaveAddress(uri); + slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); + log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + } - currentPart.addSlaveAddress(uri); - entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); - log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); - } - }); - } + private void addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) { + Set removedSlaves = new HashSet(currentPart.getSlaveAddresses()); + removedSlaves.removeAll(newPart.getSlaveAddresses()); - break; - } + for (URI uri : removedSlaves) { + currentPart.removeSlaveAddress(uri); + + slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); + log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + + Set addedSlaves = new HashSet(newPart.getSlaveAddresses()); + addedSlaves.removeAll(currentPart.getSlaveAddresses()); + for (final URI uri : addedSlaves) { + Future future = entry.addSlave(uri.getHost(), uri.getPort()); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't add slave: " + uri, future.cause()); + return; + } + + currentPart.addSlaveAddress(uri); + entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); + log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + }); } } @@ -511,7 +550,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } if (clusterNodeInfo.containsFlag(Flag.FAIL)) { - partition.setMasterFail(true); + if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { + partition.addFailedSlaveAddress(clusterNodeInfo.getAddress()); + } else { + partition.setMasterFail(true); + } } if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { diff --git a/src/main/java/org/redisson/cluster/ClusterNodeInfo.java b/src/main/java/org/redisson/cluster/ClusterNodeInfo.java index 82406e81a..6c1df8914 100644 --- a/src/main/java/org/redisson/cluster/ClusterNodeInfo.java +++ b/src/main/java/org/redisson/cluster/ClusterNodeInfo.java @@ -16,9 +16,7 @@ package org.redisson.cluster; import java.net.URI; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.redisson.misc.URIBuilder; @@ -29,7 +27,7 @@ public class ClusterNodeInfo { private String nodeId; private URI address; - private final List flags = new ArrayList(); + private final Set flags = new HashSet(); private String slaveOf; private final Set slotRanges = new HashSet(); diff --git a/src/main/java/org/redisson/cluster/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java index 37bafdce3..f6486bd9f 100644 --- a/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -17,8 +17,8 @@ package org.redisson.cluster; import java.net.InetSocketAddress; import java.net.URI; +import java.util.Collections; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.Set; import org.redisson.misc.URIBuilder; @@ -28,7 +28,8 @@ public class ClusterPartition { private final String nodeId; private boolean masterFail; private URI masterAddress; - private Set slaveAddresses = new HashSet(); + private final Set slaveAddresses = new HashSet(); + private final Set failedSlaves = new HashSet(); private final Set slotRanges = new HashSet(); public ClusterPartition(String nodeId) { @@ -71,21 +72,25 @@ public class ClusterPartition { this.masterAddress = masterAddress; } - public Set getAllAddresses() { - Set result = new LinkedHashSet(); - result.add(masterAddress); - result.addAll(slaveAddresses); - return result; + public void addFailedSlaveAddress(URI address) { + failedSlaves.add(address); + } + public Set getFailedSlaveAddresses() { + return Collections.unmodifiableSet(failedSlaves); + } + public void removeFailedSlaveAddress(URI uri) { + failedSlaves.remove(uri); } public void addSlaveAddress(URI address) { slaveAddresses.add(address); } public Set getSlaveAddresses() { - return slaveAddresses; + return Collections.unmodifiableSet(slaveAddresses); } public void removeSlaveAddress(URI uri) { slaveAddresses.remove(uri); + failedSlaves.remove(uri); } } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 7f4298045..ed414caa5 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -16,6 +16,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -213,7 +214,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet slots) { MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); - List> fs = entry.initSlaveBalancer(java.util.Collections.emptyList()); + List> fs = entry.initSlaveBalancer(java.util.Collections.emptySet()); for (Future future : fs) { future.syncUninterruptibly(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index f19d5122a..a5a0acc31 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -135,6 +135,7 @@ public class MasterSlaveEntry { if (config.getReadMode() == ReadMode.SLAVE && (!addr.getHostName().equals(host) || port != addr.getPort())) { connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); + log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); } return true; }