refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent be55c7b122
commit 03129d5c8e

@ -138,15 +138,18 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private void checkFailedSlaves(Set<InetSocketAddress> slaveIPs) { private void checkFailedSlaves(Set<InetSocketAddress> slaveIPs) {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
Set<InetSocketAddress> failedSlaves = entry.getAllEntries().stream() Set<RedisClient> failedSlaves = entry.getAllEntries().stream()
.filter(e -> e.getNodeType() == NodeType.SLAVE .filter(e -> e.getNodeType() == NodeType.SLAVE
&& !slaveIPs.contains(e.getClient().getAddr())) && !slaveIPs.contains(e.getClient().getAddr()))
.map(e -> e.getClient().getAddr()) .map(e -> e.getClient())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
for (InetSocketAddress slave : failedSlaves) { for (RedisClient slave : failedSlaves) {
if (entry.slaveDown(slave, FreezeReason.MANAGER)) { if (entry.slaveDown(slave.getAddr(), FreezeReason.MANAGER)) {
log.info("slave: {} is down", slave); log.info("slave: {} is down", slave);
disconnectNode(new RedisURI(slave.getConfig().getAddress().getScheme(),
slave.getAddr().getAddress().getHostAddress(),
slave.getAddr().getPort()));
} }
} }
} }
@ -183,6 +186,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri); RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri);
changeFuture.onComplete((res, e) -> { changeFuture.onComplete((res, e) -> {
if (e != null) { if (e != null) {
log.error("Unable to change master to " + addr, e);
currentMaster.compareAndSet(addr, master); currentMaster.compareAndSet(addr, master);
} }
}); });

Loading…
Cancel
Save