Fixed - connections to slave are doubled after redis cluster failover. #2672

pull/2760/head
Nikita Koksharov 5 years ago
parent 53fbbf7d1f
commit f4ad9382e1

@ -55,9 +55,8 @@ public class ClusterNodesDecoder implements Decoder<List<ClusterNodeInfo>> {
String flags = params[2];
for (String flag : flags.split(",")) {
String flagValue = flag.toUpperCase().replaceAll("\\?", "");
for (Flag nodeInfoFlag : ClusterNodeInfo.Flag.values()) {
if (nodeInfoFlag.name().equals(flagValue)) {
if (nodeInfoFlag.getValue().equalsIgnoreCase(flag)) {
node.addFlag(nodeInfoFlag);
break;
}

@ -562,11 +562,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeEntry(slot);
if (entry.getReferences() == 0) {
entry.shutdownAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr());
}
removeEntry(slot);
}
Integer addedSlots = 0;
@ -595,7 +591,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) {
continue;
}
MasterSlaveEntry entry = getEntry(currentPartition.slots().nextSetBit(0));
BitSet addedSlots = newPartition.copySlots();
addedSlots.andNot(currentPartition.slots());

@ -28,7 +28,20 @@ import org.redisson.misc.RedisURI;
*/
public class ClusterNodeInfo {
public enum Flag {NOFLAGS, SLAVE, MASTER, MYSELF, FAIL, HANDSHAKE, NOADDR};
public enum Flag {
NOFLAGS("noflags"), SLAVE("slave"), MASTER("master"), MYSELF("myself"),
FAIL("fail"), EVENTUAL_FAIL("fail?"), HANDSHAKE("handshake"), NOADDR("noaddr");
private final String value;
Flag(String value) {
this.value = value;
}
public String getValue() {
return value;
}
};
private final String nodeInfo;

@ -571,15 +571,25 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (oldEntry != entry) {
entry.incReference();
}
shutdownEntry(oldEntry);
client2entry.put(entry.getClient(), entry);
}
protected final MasterSlaveEntry removeEntry(Integer slot) {
protected final void removeEntry(Integer slot) {
MasterSlaveEntry entry = slot2entry.getAndSet(slot, null);
if (entry.decReference() == 0) {
shutdownEntry(entry);
}
private void shutdownEntry(MasterSlaveEntry entry) {
if (entry != null && entry.decReference() == 0) {
client2entry.remove(entry.getClient());
entry.shutdownAsync();
String slaves = entry.getAllEntries().stream()
.filter(e -> !e.getClient().getAddr().equals(entry.getClient().getAddr()))
.map(e -> e.getClient().toString())
.collect(Collectors.joining(","));
log.info("{} master and related slaves: {} removed", entry.getClient().getAddr(), slaves);
}
return entry;
}
@Override

Loading…
Cancel
Save