Cluster slave nodes up/down state discovery fixed

pull/395/head
Nikita 9 years ago
parent f43f48f33f
commit e95b6d6a80

@ -153,8 +153,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private Future<Collection<Future<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + " for slot ranges: " +
partition.getSlotRanges() + ". Reason - server has FAIL flag");
partition.getMasterAddress() + " for slot ranges: " +
partition.getSlotRanges() + ". Reason - server has FAIL flag");
if (partition.getSlotRanges().isEmpty()) {
e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + ". Reason - server has FAIL flag");
}
return newFailedFuture(e);
}
@ -200,11 +205,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
config.setSlaveAddresses(partition.getSlaveAddresses());
e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
List<Future<Void>> fs = e.initSlaveBalancer();
futures.addAll(fs);
if (!partition.getSlaveAddresses().isEmpty()) {
List<Future<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
futures.addAll(fs);
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
if (!partition.getFailedSlaveAddresses().isEmpty()) {
log.warn("slaves: {} is down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges());
}
}
}
@ -242,8 +250,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<URI> slaves = new ArrayList<URI>();
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
for (ClusterPartition partition : lastPartitions.values()) {
nodes.add(partition.getMasterAddress());
slaves.addAll(partition.getSlaveAddresses());
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
Set<URI> partitionSlaves = new HashSet<URI>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
}
// master nodes first
nodes.addAll(slaves);
@ -302,43 +315,69 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition newPart : newPartitions) {
for (final ClusterPartition currentPart : lastPartitions.values()) {
for (ClusterPartition currentPart : lastPartitions.values()) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
final MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses());
removedSlaves.removeAll(newPart.getSlaveAddresses());
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
// should be invoked first in order to removed stale failedSlaveAddresses
addRemoveSlaves(entry, currentPart, newPart);
// Does some slaves change failed state to alive?
upDownSlaves(entry, currentPart, newPart);
for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
break;
}
}
}
slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
}
private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URI> aliveSlaves = new HashSet<URI>(currentPart.getFailedSlaveAddresses());
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (final URI uri : addedSlaves) {
Future<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + uri, future.cause());
return;
}
Set<URI> failedSlaves = new HashSet<URI>(newPart.getFailedSlaveAddresses());
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri);
slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});
}
private void addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses());
removedSlaves.removeAll(newPart.getSlaveAddresses());
break;
}
for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
}
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (final URI uri : addedSlaves) {
Future<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + uri, future.cause());
return;
}
currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});
}
}
@ -511,7 +550,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
if (clusterNodeInfo.containsFlag(Flag.FAIL)) {
partition.setMasterFail(true);
if (clusterNodeInfo.containsFlag(Flag.SLAVE)) {
partition.addFailedSlaveAddress(clusterNodeInfo.getAddress());
} else {
partition.setMasterFail(true);
}
}
if (clusterNodeInfo.containsFlag(Flag.SLAVE)) {

@ -16,9 +16,7 @@
package org.redisson.cluster;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.misc.URIBuilder;
@ -29,7 +27,7 @@ public class ClusterNodeInfo {
private String nodeId;
private URI address;
private final List<Flag> flags = new ArrayList<Flag>();
private final Set<Flag> flags = new HashSet<Flag>();
private String slaveOf;
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();

@ -17,8 +17,8 @@ package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import org.redisson.misc.URIBuilder;
@ -28,7 +28,8 @@ public class ClusterPartition {
private final String nodeId;
private boolean masterFail;
private URI masterAddress;
private Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> failedSlaves = new HashSet<URI>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
public ClusterPartition(String nodeId) {
@ -71,21 +72,25 @@ public class ClusterPartition {
this.masterAddress = masterAddress;
}
public Set<URI> getAllAddresses() {
Set<URI> result = new LinkedHashSet<URI>();
result.add(masterAddress);
result.addAll(slaveAddresses);
return result;
public void addFailedSlaveAddress(URI address) {
failedSlaves.add(address);
}
public Set<URI> getFailedSlaveAddresses() {
return Collections.unmodifiableSet(failedSlaves);
}
public void removeFailedSlaveAddress(URI uri) {
failedSlaves.remove(uri);
}
public void addSlaveAddress(URI address) {
slaveAddresses.add(address);
}
public Set<URI> getSlaveAddresses() {
return slaveAddresses;
return Collections.unmodifiableSet(slaveAddresses);
}
public void removeSlaveAddress(URI uri) {
slaveAddresses.remove(uri);
failedSlaves.remove(uri);
}
}

@ -16,6 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -213,7 +214,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<Future<Void>> fs = entry.initSlaveBalancer(java.util.Collections.emptyList());
List<Future<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
for (Future<Void> future : fs) {
future.syncUninterruptibly();
}

@ -135,6 +135,7 @@ public class MasterSlaveEntry {
if (config.getReadMode() == ReadMode.SLAVE
&& (!addr.getHostName().equals(host) || port != addr.getPort())) {
connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort());
}
return true;
}

Loading…
Cancel
Save