Connect to nodes during cluster scan from Cluster Nodes info, not from initial config. #268

pull/282/head
Nikita 9 years ago
parent 8279398720
commit 6563a03c4e

@ -92,6 +92,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
if (!connection.isActive()) {
log.warn("connection for {} is not active!", connection.getRedisClient().getAddr());
connection.closeAsync();
connection = null;
}
if (connection == null) {
nodeConnections.remove(addr);
}
return connection; return connection;
} }
@ -132,30 +140,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
@Override @Override
public void run() { public void run() {
try { try {
for (URI addr : cfg.getNodeAddresses()) { for (ClusterPartition partition : lastPartitions.values()) {
RedisConnection connection = connect(cfg, addr); for (URI uri : partition.getAllAddresses()) {
if (connection == null || !connection.isActive()) { RedisConnection connection = connect(cfg, uri);
if (connection == null) {
continue; continue;
} }
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); updateClusterState(cfg, connection);
return;
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); }
Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue);
checkMasterNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions);
break;
} }
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
}
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
} }
private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection) {
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue);
checkMasterNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions);
} }
private Collection<ClusterSlotRange> slots(Collection<ClusterPartition> partitions) { private Collection<ClusterSlotRange> slots(Collection<ClusterPartition> partitions) {

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -69,6 +70,13 @@ public class ClusterPartition {
this.masterAddress = masterAddress; this.masterAddress = masterAddress;
} }
public Set<URI> getAllAddresses() {
Set<URI> result = new LinkedHashSet<URI>();
result.add(masterAddress);
result.addAll(slaveAddresses);
return result;
}
public void addSlaveAddress(URI address) { public void addSlaveAddress(URI address) {
slaveAddresses.add(address); slaveAddresses.add(address);
} }

Loading…
Cancel
Save