Cluster slaves change monitoring. #271

pull/282/head
Nikita 9 years ago
parent 6563a03c4e
commit 8f1d045422

@ -16,8 +16,8 @@
package org.redisson; package org.redisson;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.HashSet;
import java.util.List; import java.util.Set;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
@ -26,7 +26,7 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
/** /**
* Redis slave servers addresses * Redis slave servers addresses
*/ */
private List<URI> slaveAddresses = new ArrayList<URI>(); private Set<URI> slaveAddresses = new HashSet<URI>();
/** /**
* Redis master server address * Redis master server address
@ -75,10 +75,10 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
slaveAddresses.add(slaveAddress); slaveAddresses.add(slaveAddress);
return this; return this;
} }
public List<URI> getSlaveAddresses() { public Set<URI> getSlaveAddresses() {
return slaveAddresses; return slaveAddresses;
} }
void setSlaveAddresses(List<URI> readAddresses) { public void setSlaveAddresses(Set<URI> readAddresses) {
this.slaveAddresses = readAddresses; this.slaveAddresses = readAddresses;
} }

@ -124,8 +124,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
MasterSlaveServersConfig config = create(cfg); MasterSlaveServersConfig config = create(cfg);
log.info("added master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
config.setMasterAddress(partition.getMasterAddress()); config.setMasterAddress(partition.getMasterAddress());
config.setSlaveAddresses(partition.getSlaveAddresses());
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config); SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
@ -164,9 +166,43 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue); Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue);
checkMasterNodesChange(newPartitions); checkMasterNodesChange(newPartitions);
checkSlaveNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions); checkSlotsChange(cfg, newPartitions);
} }
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition newPart : newPartitions) {
for (ClusterPartition currentPart : lastPartitions.values()) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses());
removedSlaves.removeAll(newPart.getSlaveAddresses());
for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
slaveDown(entry, uri.getHost(), uri.getPort());
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
}
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (URI uri : addedSlaves) {
currentPart.addSlaveAddress(uri);
entry.addSlave(uri.getHost(), uri.getPort());
entry.slaveUp(uri.getHost(), uri.getPort());
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
break;
}
}
}
private Collection<ClusterSlotRange> slots(Collection<ClusterPartition> partitions) { private Collection<ClusterSlotRange> slots(Collection<ClusterPartition> partitions) {
List<ClusterSlotRange> result = new ArrayList<ClusterSlotRange>(); List<ClusterSlotRange> result = new ArrayList<ClusterSlotRange>();
for (ClusterPartition clusterPartition : partitions) { for (ClusterPartition clusterPartition : partitions) {
@ -221,7 +257,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
removedSlots.removeAll(newPartitionsSlots); removedSlots.removeAll(newPartitionsSlots);
lastPartitions.keySet().removeAll(removedSlots); lastPartitions.keySet().removeAll(removedSlots);
if (!removedSlots.isEmpty()) { if (!removedSlots.isEmpty()) {
log.info("{} slot ranges found to remove", removedSlots.size()); log.info("{} slot ranges found to remove", removedSlots);
} }
for (ClusterSlotRange slot : removedSlots) { for (ClusterSlotRange slot : removedSlots) {
@ -237,7 +273,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartitionsSlots); Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartitionsSlots);
addedSlots.removeAll(lastPartitions.keySet()); addedSlots.removeAll(lastPartitions.keySet());
if (!addedSlots.isEmpty()) { if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots.size()); log.info("{} slots found to add", addedSlots);
} }
for (ClusterSlotRange slot : addedSlots) { for (ClusterSlotRange slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot); ClusterPartition partition = find(newPartitions, slot);
@ -267,19 +303,21 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartition.getSlotRanges()); Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartition.getSlotRanges());
addedSlots.removeAll(currentPartition.getSlotRanges()); addedSlots.removeAll(currentPartition.getSlotRanges());
MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next()); MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next());
currentPartition.addSlotRanges(addedSlots);
for (ClusterSlotRange slot : addedSlots) { for (ClusterSlotRange slot : addedSlots) {
entry.addSlotRange(slot); entry.addSlotRange(slot);
addEntry(slot, entry); addEntry(slot, entry);
log.info("slot {} added for {}", slot, entry.getClient().getAddr()); log.info("{} slot added for {}", slot, entry.getClient().getAddr());
lastPartitions.put(slot, currentPartition); lastPartitions.put(slot, currentPartition);
} }
Set<ClusterSlotRange> removedSlots = new HashSet<ClusterSlotRange>(currentPartition.getSlotRanges()); Set<ClusterSlotRange> removedSlots = new HashSet<ClusterSlotRange>(currentPartition.getSlotRanges());
removedSlots.removeAll(newPartition.getSlotRanges()); removedSlots.removeAll(newPartition.getSlotRanges());
lastPartitions.keySet().removeAll(removedSlots); lastPartitions.keySet().removeAll(removedSlots);
currentPartition.removeSlotRanges(removedSlots);
for (ClusterSlotRange slot : removedSlots) { for (ClusterSlotRange slot : removedSlots) {
log.info("slot {} removed for {}", slot, entry.getClient().getAddr()); log.info("{} slot removed for {}", slot, entry.getClient().getAddr());
entry.removeSlotRange(slot); entry.removeSlotRange(slot);
removeMaster(slot); removeMaster(slot);
} }

@ -17,7 +17,9 @@ package org.redisson.cluster;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
@ -30,7 +32,7 @@ public class ClusterNodeInfo {
private final List<Flag> flags = new ArrayList<Flag>(); private final List<Flag> flags = new ArrayList<Flag>();
private String slaveOf; private String slaveOf;
private final List<ClusterSlotRange> slotRanges = new ArrayList<ClusterSlotRange>(); private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
public String getNodeId() { public String getNodeId() {
return nodeId; return nodeId;
@ -49,7 +51,7 @@ public class ClusterNodeInfo {
public void addSlotRange(ClusterSlotRange range) { public void addSlotRange(ClusterSlotRange range) {
slotRanges.add(range); slotRanges.add(range);
} }
public List<ClusterSlotRange> getSlotRanges() { public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges; return slotRanges;
} }

@ -17,10 +17,8 @@ package org.redisson.cluster;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
@ -30,7 +28,7 @@ public class ClusterPartition {
private final String nodeId; private final String nodeId;
private boolean masterFail; private boolean masterFail;
private URI masterAddress; private URI masterAddress;
private List<URI> slaveAddresses = new ArrayList<URI>(); private Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>(); private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
public ClusterPartition(String nodeId) { public ClusterPartition(String nodeId) {
@ -49,9 +47,12 @@ public class ClusterPartition {
return masterFail; return masterFail;
} }
public void addSlotRanges(List<ClusterSlotRange> ranges) { public void addSlotRanges(Set<ClusterSlotRange> ranges) {
slotRanges.addAll(ranges); slotRanges.addAll(ranges);
} }
public void removeSlotRanges(Set<ClusterSlotRange> ranges) {
slotRanges.removeAll(ranges);
}
public Set<ClusterSlotRange> getSlotRanges() { public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges; return slotRanges;
} }
@ -80,8 +81,11 @@ public class ClusterPartition {
public void addSlaveAddress(URI address) { public void addSlaveAddress(URI address) {
slaveAddresses.add(address); slaveAddresses.add(address);
} }
public List<URI> getSlaveAddresses() { public Set<URI> getSlaveAddresses() {
return slaveAddresses; return slaveAddresses;
} }
public void removeSlaveAddress(URI uri) {
slaveAddresses.remove(uri);
}
} }

@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise;
//TODO ping support //TODO ping support
public interface ConnectionManager { public interface ConnectionManager {
void slaveDown(MasterSlaveEntry entry, String host, int port);
Collection<RedisClientEntry> getClients(); Collection<RedisClientEntry> getClients();
void shutdownAsync(RedisClient client); void shutdownAsync(RedisClient client);

@ -488,24 +488,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null; return null;
} }
protected void slaveDown(ClusterSlotRange slotRange, String host, int port) { public void slaveDown(MasterSlaveEntry entry, String host, int port) {
Collection<RedisPubSubConnection> allPubSubConnections = getEntry(slotRange).slaveDown(host, port); Collection<RedisPubSubConnection> allPubSubConnections = entry.slaveDown(host, port);
// reattach listeners to other channels // reattach listeners to other channels
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) { for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) { for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
PubSubConnectionEntry entry = mapEntry.getValue(); PubSubConnectionEntry pubSubEntry = mapEntry.getValue();
final String channelName = mapEntry.getKey(); final String channelName = mapEntry.getKey();
if (!entry.getConnection().equals(redisPubSubConnection)) { if (!pubSubEntry.getConnection().equals(redisPubSubConnection)) {
continue; continue;
} }
synchronized (entry) { synchronized (pubSubEntry) {
entry.close(); pubSubEntry.close();
final Collection<RedisPubSubListener> listeners = entry.getListeners(channelName); final Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
if (entry.getConnection().getPatternChannels().get(channelName) != null) { if (pubSubEntry.getConnection().getPatternChannels().get(channelName) != null) {
Codec subscribeCodec = punsubscribe(channelName); Codec subscribeCodec = punsubscribe(channelName);
if (!listeners.isEmpty()) { if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = psubscribe(channelName, subscribeCodec); Future<PubSubConnectionEntry> future = psubscribe(channelName, subscribeCodec);
@ -544,6 +544,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
} }
protected void slaveDown(ClusterSlotRange slotRange, String host, int port) {
MasterSlaveEntry entry = getEntry(slotRange);
slaveDown(entry, host, port);
}
protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { protected void changeMaster(ClusterSlotRange slotRange, String host, int port) {
getEntry(slotRange).changeMaster(host, port); getEntry(slotRange).changeMaster(host, port);
} }

@ -71,7 +71,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize())); this.config.getSlaveSubscriptionConnectionPoolSize()));
} }
if (config.getSlaveAddresses().size() > 1) { if (!config.getSlaveAddresses().isEmpty()) {
slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
} }
@ -110,7 +110,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
public void slaveUp(String host, int port) { public void slaveUp(String host, int port) {
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
if (!addr.getHostName().equals(host) && port != addr.getPort()) { if (!addr.getHostName().equals(host) && port != addr.getPort()) {
slaveDown(addr.getHostName(), addr.getPort()); connectionManager.slaveDown(this, addr.getHostName(), addr.getPort());
} }
slaveBalancer.unfreeze(host, port); slaveBalancer.unfreeze(host, port);
} }
@ -126,7 +126,8 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
setupMasterEntry(host, port); setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster); writeConnectionHolder.remove(oldMaster);
if (slaveBalancer.getAvailableClients() > 1) { if (slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port); // more than one slave avaliable, so master could be removed from slaves
connectionManager.slaveDown(this, host, port);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
} }

Loading…
Cancel
Save