refactoring

pull/5170/head
Nikita Koksharov 2 years ago
parent b937d44efb
commit 15994fd3da

@ -527,7 +527,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
.collect(Collectors.toList());
nonFailedSlaves.forEach(uri -> {
if (entry.hasSlave(uri)) {
CompletableFuture<Boolean> f = entry.slaveUpAsync(uri, FreezeReason.MANAGER);
CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER);
f = f.thenCompose(v -> {
if (v) {
log.info("slave: {} is up for slot ranges: {}", uri, currentPart.getSlotRanges());
@ -582,7 +582,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (RedisURI uri : addedSlaves) {
ClientConnectionsEntry slaveEntry = entry.getEntry(uri);
if (slaveEntry != null) {
CompletableFuture<Boolean> slaveUpFuture = entry.slaveUpAsync(uri, FreezeReason.MANAGER);
CompletableFuture<Boolean> slaveUpFuture = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER);
slaveUpFuture = slaveUpFuture.thenCompose(v -> {
if (v) {
currentPart.addSlaveAddress(uri);

@ -415,38 +415,26 @@ public class MasterSlaveEntry {
return masterEntry.getClient();
}
public boolean slaveUp(ClientConnectionsEntry entry, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(entry, freezeReason)) {
return false;
}
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!config.isSlaveNotUsed()
&& !addr.equals(entry.getClient().getAddr())) {
if (slaveDown(addr, FreezeReason.SYSTEM)) {
log.info("master {} excluded from slaves", addr);
}
}
public CompletableFuture<Boolean> slaveUpAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) {
noPubSubSlaves.set(false);
return true;
CompletableFuture<Boolean> f = slaveBalancer.unfreezeAsync(entry, freezeReason);
return f.thenCompose(r -> {
if (r) {
return excludeMasterFromSlaves(entry.getClient().getAddr());
}
return CompletableFuture.completedFuture(r);
});
}
public boolean slaveUp(RedisURI address, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(address, freezeReason)) {
return false;
}
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!config.isSlaveNotUsed()
&& !address.equals(addr)) {
if (slaveDown(addr, FreezeReason.SYSTEM)) {
log.info("master {} excluded from slaves", addr);
}
}
public CompletableFuture<Boolean> slaveUpAsync(RedisURI address, FreezeReason freezeReason) {
noPubSubSlaves.set(false);
return true;
CompletableFuture<Boolean> f = slaveBalancer.unfreezeAsync(address, freezeReason);
return f.thenCompose(r -> {
if (r) {
return excludeMasterFromSlaves(address);
}
return CompletableFuture.completedFuture(r);
});
}
public CompletableFuture<Boolean> excludeMasterFromSlaves(RedisURI address) {
@ -463,7 +451,7 @@ public class MasterSlaveEntry {
});
}
public CompletableFuture<Boolean> excludeMasterFromSlaves(InetSocketAddress address) {
private CompletableFuture<Boolean> excludeMasterFromSlaves(InetSocketAddress address) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
if (config.isSlaveNotUsed() || addr.equals(address)) {
return CompletableFuture.completedFuture(false);
@ -477,7 +465,7 @@ public class MasterSlaveEntry {
});
}
public CompletableFuture<Boolean> slaveUpAsync(RedisURI address, FreezeReason freezeReason) {
public CompletableFuture<Boolean> slaveUpNoMasterExclusionAsync(RedisURI address, FreezeReason freezeReason) {
noPubSubSlaves.set(false);
return slaveBalancer.unfreezeAsync(address, freezeReason);
}
@ -539,7 +527,7 @@ public class MasterSlaveEntry {
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER);
// freeze in slaveBalancer
slaveDown(oldMaster.getClient().getAddr(), FreezeReason.MANAGER);
slaveDownAsync(oldMaster.getClient().getAddr(), FreezeReason.MANAGER);
// check if at least one slave is available, use master as slave if false
if (!config.isSlaveNotUsed()) {

@ -594,7 +594,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
});
}
CompletableFuture<Boolean> f = entry.slaveUpAsync(uri, FreezeReason.MANAGER);
CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER);
return f.thenCompose(e -> {
if (e) {
log.info("slave: {} is up", uri);

@ -104,16 +104,6 @@ public class LoadBalancerManager {
return count;
}
public boolean unfreeze(RedisURI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = getEntry(address);
if (entry == null) {
log.error("Can't find {} in slaves! Available slaves: {}", address, client2Entry.keySet());
return false;
}
return unfreeze(entry, freezeReason);
}
public CompletableFuture<Boolean> unfreezeAsync(RedisURI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = getEntry(address);
if (entry == null) {
@ -181,7 +171,7 @@ public class LoadBalancerManager {
return false;
}
private CompletableFuture<Boolean> unfreezeAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) {
public CompletableFuture<Boolean> unfreezeAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) {
return unfreezeAsync(entry, freezeReason, 0);
}

@ -361,9 +361,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
if ("PONG".equals(t)) {
if (masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT)) {
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
}
CompletableFuture<Boolean> ff = masterSlaveEntry.slaveUpAsync(entry, FreezeReason.RECONNECT);
ff.thenAccept(r -> {
if (r) {
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
}
});
} else {
scheduleCheck(entry);
}

Loading…
Cancel
Save