diff --git a/src/main/java/org/redisson/MasterSlaveServersConfig.java b/src/main/java/org/redisson/MasterSlaveServersConfig.java index 0c92a439e..a567867d7 100644 --- a/src/main/java/org/redisson/MasterSlaveServersConfig.java +++ b/src/main/java/org/redisson/MasterSlaveServersConfig.java @@ -16,8 +16,8 @@ package org.redisson; import java.net.URI; -import java.util.ArrayList; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import org.redisson.misc.URIBuilder; @@ -26,7 +26,7 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig slaveAddresses = new ArrayList(); + private Set slaveAddresses = new HashSet(); /** * Redis master server address @@ -75,10 +75,10 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig getSlaveAddresses() { + public Set getSlaveAddresses() { return slaveAddresses; } - void setSlaveAddresses(List readAddresses) { + public void setSlaveAddresses(Set readAddresses) { this.slaveAddresses = readAddresses; } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 1b22012b9..1fb6b6d95 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -124,8 +124,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } MasterSlaveServersConfig config = create(cfg); - log.info("added master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); config.setMasterAddress(partition.getMasterAddress()); + config.setSlaveAddresses(partition.getSlaveAddresses()); + log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); @@ -164,9 +166,43 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Collection newPartitions = parsePartitions(nodesValue); checkMasterNodesChange(newPartitions); + checkSlaveNodesChange(newPartitions); checkSlotsChange(cfg, newPartitions); } + private void checkSlaveNodesChange(Collection newPartitions) { + for (ClusterPartition newPart : newPartitions) { + for (ClusterPartition currentPart : lastPartitions.values()) { + if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { + continue; + } + MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); + + Set removedSlaves = new HashSet(currentPart.getSlaveAddresses()); + removedSlaves.removeAll(newPart.getSlaveAddresses()); + + for (URI uri : removedSlaves) { + currentPart.removeSlaveAddress(uri); + + slaveDown(entry, uri.getHost(), uri.getPort()); + log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + + Set addedSlaves = new HashSet(newPart.getSlaveAddresses()); + addedSlaves.removeAll(currentPart.getSlaveAddresses()); + for (URI uri : addedSlaves) { + currentPart.addSlaveAddress(uri); + + entry.addSlave(uri.getHost(), uri.getPort()); + entry.slaveUp(uri.getHost(), uri.getPort()); + log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + + break; + } + } + } + private Collection slots(Collection partitions) { List result = new ArrayList(); for (ClusterPartition clusterPartition : partitions) { @@ -221,7 +257,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { removedSlots.removeAll(newPartitionsSlots); lastPartitions.keySet().removeAll(removedSlots); if (!removedSlots.isEmpty()) { - log.info("{} slot ranges found to remove", removedSlots.size()); + log.info("{} slot ranges found to remove", removedSlots); } for (ClusterSlotRange slot : removedSlots) { @@ -237,7 +273,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Set addedSlots = new HashSet(newPartitionsSlots); addedSlots.removeAll(lastPartitions.keySet()); if (!addedSlots.isEmpty()) { - log.info("{} slots found to add", addedSlots.size()); + log.info("{} slots found to add", addedSlots); } for (ClusterSlotRange slot : addedSlots) { ClusterPartition partition = find(newPartitions, slot); @@ -267,19 +303,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Set addedSlots = new HashSet(newPartition.getSlotRanges()); addedSlots.removeAll(currentPartition.getSlotRanges()); MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next()); + currentPartition.addSlotRanges(addedSlots); for (ClusterSlotRange slot : addedSlots) { entry.addSlotRange(slot); addEntry(slot, entry); - log.info("slot {} added for {}", slot, entry.getClient().getAddr()); + log.info("{} slot added for {}", slot, entry.getClient().getAddr()); lastPartitions.put(slot, currentPartition); } Set removedSlots = new HashSet(currentPartition.getSlotRanges()); removedSlots.removeAll(newPartition.getSlotRanges()); lastPartitions.keySet().removeAll(removedSlots); + currentPartition.removeSlotRanges(removedSlots); for (ClusterSlotRange slot : removedSlots) { - log.info("slot {} removed for {}", slot, entry.getClient().getAddr()); + log.info("{} slot removed for {}", slot, entry.getClient().getAddr()); entry.removeSlotRange(slot); removeMaster(slot); } diff --git a/src/main/java/org/redisson/cluster/ClusterNodeInfo.java b/src/main/java/org/redisson/cluster/ClusterNodeInfo.java index 0f14390d1..82406e81a 100644 --- a/src/main/java/org/redisson/cluster/ClusterNodeInfo.java +++ b/src/main/java/org/redisson/cluster/ClusterNodeInfo.java @@ -17,7 +17,9 @@ package org.redisson.cluster; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.redisson.misc.URIBuilder; @@ -30,7 +32,7 @@ public class ClusterNodeInfo { private final List flags = new ArrayList(); private String slaveOf; - private final List slotRanges = new ArrayList(); + private final Set slotRanges = new HashSet(); public String getNodeId() { return nodeId; @@ -49,7 +51,7 @@ public class ClusterNodeInfo { public void addSlotRange(ClusterSlotRange range) { slotRanges.add(range); } - public List getSlotRanges() { + public Set getSlotRanges() { return slotRanges; } diff --git a/src/main/java/org/redisson/cluster/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java index e6418ec19..37bafdce3 100644 --- a/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -17,10 +17,8 @@ package org.redisson.cluster; import java.net.InetSocketAddress; import java.net.URI; -import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashSet; -import java.util.List; import java.util.Set; import org.redisson.misc.URIBuilder; @@ -30,7 +28,7 @@ public class ClusterPartition { private final String nodeId; private boolean masterFail; private URI masterAddress; - private List slaveAddresses = new ArrayList(); + private Set slaveAddresses = new HashSet(); private final Set slotRanges = new HashSet(); public ClusterPartition(String nodeId) { @@ -49,9 +47,12 @@ public class ClusterPartition { return masterFail; } - public void addSlotRanges(List ranges) { + public void addSlotRanges(Set ranges) { slotRanges.addAll(ranges); } + public void removeSlotRanges(Set ranges) { + slotRanges.removeAll(ranges); + } public Set getSlotRanges() { return slotRanges; } @@ -80,8 +81,11 @@ public class ClusterPartition { public void addSlaveAddress(URI address) { slaveAddresses.add(address); } - public List getSlaveAddresses() { + public Set getSlaveAddresses() { return slaveAddresses; } + public void removeSlaveAddress(URI uri) { + slaveAddresses.remove(uri); + } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index e20fdc71f..c8873d4fc 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise; //TODO ping support public interface ConnectionManager { + void slaveDown(MasterSlaveEntry entry, String host, int port); + Collection getClients(); void shutdownAsync(RedisClient client); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 17f2ff3fe..d381eee1a 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -488,24 +488,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - protected void slaveDown(ClusterSlotRange slotRange, String host, int port) { - Collection allPubSubConnections = getEntry(slotRange).slaveDown(host, port); + public void slaveDown(MasterSlaveEntry entry, String host, int port) { + Collection allPubSubConnections = entry.slaveDown(host, port); // reattach listeners to other channels for (Entry mapEntry : name2PubSubConnection.entrySet()) { for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) { - PubSubConnectionEntry entry = mapEntry.getValue(); + PubSubConnectionEntry pubSubEntry = mapEntry.getValue(); final String channelName = mapEntry.getKey(); - if (!entry.getConnection().equals(redisPubSubConnection)) { + if (!pubSubEntry.getConnection().equals(redisPubSubConnection)) { continue; } - synchronized (entry) { - entry.close(); + synchronized (pubSubEntry) { + pubSubEntry.close(); - final Collection listeners = entry.getListeners(channelName); - if (entry.getConnection().getPatternChannels().get(channelName) != null) { + final Collection listeners = pubSubEntry.getListeners(channelName); + if (pubSubEntry.getConnection().getPatternChannels().get(channelName) != null) { Codec subscribeCodec = punsubscribe(channelName); if (!listeners.isEmpty()) { Future future = psubscribe(channelName, subscribeCodec); @@ -544,6 +544,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } + protected void slaveDown(ClusterSlotRange slotRange, String host, int port) { + MasterSlaveEntry entry = getEntry(slotRange); + slaveDown(entry, host, port); + } + protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { getEntry(slotRange).changeMaster(host, port); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index acc495ab7..64562a850 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -71,7 +71,7 @@ public class MasterSlaveEntry { this.config.getSlaveConnectionPoolSize(), this.config.getSlaveSubscriptionConnectionPoolSize())); } - if (config.getSlaveAddresses().size() > 1) { + if (!config.getSlaveAddresses().isEmpty()) { slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); } @@ -110,7 +110,7 @@ public class MasterSlaveEntry { public void slaveUp(String host, int port) { InetSocketAddress addr = masterEntry.getClient().getAddr(); if (!addr.getHostName().equals(host) && port != addr.getPort()) { - slaveDown(addr.getHostName(), addr.getPort()); + connectionManager.slaveDown(this, addr.getHostName(), addr.getPort()); } slaveBalancer.unfreeze(host, port); } @@ -126,7 +126,8 @@ public class MasterSlaveEntry { setupMasterEntry(host, port); writeConnectionHolder.remove(oldMaster); if (slaveBalancer.getAvailableClients() > 1) { - slaveDown(host, port); + // more than one slave avaliable, so master could be removed from slaves + connectionManager.slaveDown(this, host, port); } connectionManager.shutdownAsync(oldMaster.getClient()); }