refactoring

pull/4597/head
Nikita Koksharov 2 years ago
parent 065313beb8
commit 801ad8510d

@ -166,22 +166,30 @@ public class DNSMonitor {
slaveFound = true;
if (masterSlaveEntry.hasSlave(newSlaveAddr)) {
masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER);
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
slaves.put(entry.getKey(), newSlaveAddr);
promise.complete(null);
CompletableFuture<Boolean> slaveUpFuture = masterSlaveEntry.slaveUpAsync(newSlaveAddr, FreezeReason.MANAGER);
slaveUpFuture.whenComplete((r, e) -> {
if (e != null) {
promise.complete(null);
return;
}
if (r) {
slaves.put(entry.getKey(), newSlaveAddr);
masterSlaveEntry.slaveDownAsync(currentSlaveAddr, FreezeReason.MANAGER);
}
promise.complete(null);
});
} else {
CompletableFuture<Void> addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey());
addFuture.whenComplete((res, e) -> {
promise.complete(null);
if (e != null) {
log.error("Can't add slave: " + newSlaveAddr, e);
promise.complete(null);
return;
}
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
slaves.put(entry.getKey(), newSlaveAddr);
masterSlaveEntry.slaveDownAsync(currentSlaveAddr, FreezeReason.MANAGER);
promise.complete(null);
});
}
break;

@ -454,10 +454,34 @@ public class MasterSlaveEntry {
});
}
public CompletableFuture<Boolean> excludeMasterFromSlaves(InetSocketAddress address) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
if (config.checkSkipSlavesInit() || addr.equals(address)) {
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> downFuture = slaveDownAsync(addr, FreezeReason.SYSTEM);
return downFuture.thenApply(r -> {
if (r) {
log.info("master {} excluded from slaves", addr);
}
return r;
});
}
public CompletableFuture<Boolean> slaveUpAsync(RedisURI address, FreezeReason freezeReason) {
return slaveBalancer.unfreezeAsync(address, freezeReason);
}
public CompletableFuture<Boolean> slaveUpAsync(InetSocketAddress address, FreezeReason freezeReason) {
CompletableFuture<Boolean> f = slaveBalancer.unfreezeAsync(address, freezeReason);
return f.thenCompose(r -> {
if (r) {
return excludeMasterFromSlaves(address);
}
return CompletableFuture.completedFuture(r);
});
}
public boolean slaveUp(InetSocketAddress address, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(address, freezeReason)) {
return false;

Loading…
Cancel
Save