refactoring

pull/2979/head
Nikita Koksharov 5 years ago
parent 8c1a5aa91e
commit fcfec4534d

@ -160,7 +160,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
scheduleClusterChangeCheck(cfg, null);
scheduleClusterChangeCheck(cfg);
}
@Override
@ -350,7 +350,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void scheduleClusterChangeCheck(ClusterServersConfig cfg, Iterator<RedisURI> iterator) {
private void scheduleClusterChangeCheck(ClusterServersConfig cfg) {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
@ -364,7 +364,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(future.cause());
if (!future.isSuccess()) {
checkClusterState(cfg, Collections.<RedisURI>emptyList().iterator(), lastException);
checkClusterState(cfg, Collections.emptyIterator(), lastException);
return;
}
@ -381,26 +381,23 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
});
} else {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<RedisURI> nodesIterator = iterator;
if (nodesIterator == null) {
List<RedisURI> nodes = new ArrayList<>();
List<RedisURI> slaves = new ArrayList<>();
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
Set<RedisURI> partitionSlaves = new HashSet<>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
List<RedisURI> nodes = new ArrayList<>();
List<RedisURI> slaves = new ArrayList<>();
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
// master nodes first
nodes.addAll(slaves);
nodesIterator = nodes.iterator();
Set<RedisURI> partitionSlaves = new HashSet<>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
}
// master nodes first
nodes.addAll(slaves);
Iterator<RedisURI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
}
}
@ -413,7 +410,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (lastException.get() != null) {
log.error("Can't update cluster state", lastException.get());
}
scheduleClusterChangeCheck(cfg, null);
scheduleClusterChangeCheck(cfg);
return;
}
if (!getShutdownLatch().acquire()) {
@ -461,9 +458,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
checkSlaveNodesChange(newPartitions);
masterFuture.onComplete((res, ex) -> {
checkSlotsMigration(newPartitions);
checkSlotsChange(cfg, newPartitions);
checkSlotsChange(newPartitions);
getShutdownLatch().release();
scheduleClusterChangeCheck(cfg, null);
scheduleClusterChangeCheck(cfg);
});
});
}
@ -617,7 +614,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
private void checkSlotsChange(Collection<ClusterPartition> newPartitions) {
int newSlotsAmount = slotsAmount(newPartitions);
if (newSlotsAmount == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
return;

Loading…
Cancel
Save