refactoring

pull/5840/head
Nikita Koksharov 9 months ago
parent 450bfbcdbe
commit 7baad67849

@ -79,7 +79,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public void doConnect(Set<RedisURI> disconnectedSlaves, Function<RedisURI, String> hostnameMapper) {
public void doConnect(Function<RedisURI, String> hostnameMapper) {
if (cfg.getNodeAddresses().isEmpty()) {
throw new IllegalArgumentException("At least one cluster node should be defined!");
}
@ -323,7 +323,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (config.isSlaveNotUsed()) {
entry = new SingleEntry(this, config);
} else {
Set<String> slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet());
Set<String> slaveAddresses = partition.getSlaveAddresses().stream()
.filter(r -> !partition.getFailedSlaveAddresses().contains(r))
.map(r -> r.toString())
.collect(Collectors.toSet());
config.setSlaveAddresses(slaveAddresses);
entry = new MasterSlaveEntry(ClusterConnectionManager.this, config);
@ -340,7 +343,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
if (!config.isSlaveNotUsed()) {
CompletableFuture<Void> fs = entry.initSlaveBalancer(partition.getFailedSlaveAddresses(), r -> configEndpointHostName);
CompletableFuture<Void> fs = entry.initSlaveBalancer(r -> configEndpointHostName);
return fs.thenAccept(r -> {
if (!partition.getSlaveAddresses().isEmpty()) {
log.info("slaves: {} added for master: {} slot ranges: {}",

@ -190,7 +190,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (i == attempts - 1) {
lastAttempt = true;
}
doConnect(new HashSet<>(), u -> null);
doConnect(u -> null);
return;
} catch (IllegalArgumentException e) {
shutdown();
@ -210,7 +210,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
protected void doConnect(Set<RedisURI> disconnectedSlaves, Function<RedisURI, String> hostnameMapper) {
protected void doConnect(Function<RedisURI, String> hostnameMapper) {
try {
if (config.isSlaveNotUsed()) {
masterSlaveEntry = new SingleEntry(this, config);
@ -234,7 +234,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
if (!config.isSlaveNotUsed()) {
CompletableFuture<Void> fs = masterSlaveEntry.initSlaveBalancer(disconnectedSlaves, hostnameMapper);
CompletableFuture<Void> fs = masterSlaveEntry.initSlaveBalancer(hostnameMapper);
try {
if (config.getSlaveConnectionMinimumIdleSize() == 0) {
fs.join();

@ -90,12 +90,12 @@ public class MasterSlaveEntry {
return config;
}
public CompletableFuture<Void> initSlaveBalancer(Collection<RedisURI> disconnectedNodes, Function<RedisURI, String> hostnameMapper) {
public CompletableFuture<Void> initSlaveBalancer(Function<RedisURI, String> hostnameMapper) {
List<CompletableFuture<Void>> result = new ArrayList<>(config.getSlaveAddresses().size());
for (String address : config.getSlaveAddresses()) {
RedisURI uri = new RedisURI(address);
String hostname = hostnameMapper.apply(uri);
CompletableFuture<Void> f = addSlave(uri, disconnectedNodes.contains(uri), hostname);
CompletableFuture<Void> f = addSlave(uri, false, hostname);
result.add(f);
}

@ -70,7 +70,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public void doConnect(Set<RedisURI> disconnectedSlaves, Function<RedisURI, String> hostnameMapper) {
public void doConnect(Function<RedisURI, String> hostnameMapper) {
if (cfg.getNodeAddresses().isEmpty()) {
throw new IllegalArgumentException("At least one Redis node should be defined!");
}
@ -107,7 +107,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
log.warn("ReadMode = {}, but slave nodes are not found! Please specify all nodes in replicated mode.", this.config.getReadMode());
}
super.doConnect(disconnectedSlaves, hostnameMapper);
super.doConnect(hostnameMapper);
scheduleMasterChangeCheck(cfg);
}

@ -76,7 +76,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public void doConnect(Set<RedisURI> disconnectedSlaves, Function<RedisURI, String> hostnameMapper) {
public void doConnect(Function<RedisURI, String> hostnameMapper) {
checkAuth(cfg);
if ("redis".equals(scheme)) {
@ -133,13 +133,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (isHostname(host)) {
uri2hostname.put(uri, host);
}
this.config.addSlaveAddress(uri.toString());
log.debug("slave {} state: {}", slaveAddr, map);
log.info("slave: {} added", slaveAddr);
if (isSlaveDown(flags, masterLinkStatus)) {
disconnectedSlaves.add(uri);
log.warn("slave: {} is down", slaveAddr);
} else {
this.config.addSlaveAddress(uri.toString());
log.info("slave: {} added", slaveAddr);
}
}
@ -201,7 +202,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.warn("ReadMode = {}, but slave nodes are not found!", this.config.getReadMode());
}
super.doConnect(disconnectedSlaves, uri2hostname::get);
super.doConnect(uri2hostname::get);
scheduleChangeCheck(cfg, null);
}

Loading…
Cancel
Save