RedisConnectionException handling during cluster init. #65

pull/110/head
Nikita 10 years ago
parent 8bf61cb665
commit 7855cd9a34

@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisAsyncConnection; import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
@ -47,57 +48,61 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
init(config); init(config);
Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
for (URI addr : cfg.getNodeAddresses()) { for (URI addr : cfg.getNodeAddresses()) {
RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout()); RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort(), cfg.getTimeout());
RedisAsyncConnection<String, String> connection = client.connectAsync(); try {
String nodesValue = connection.clusterNodes().awaitUninterruptibly().getNow(); RedisAsyncConnection<String, String> connection = client.connectAsync();
System.out.println(nodesValue); String nodesValue = connection.clusterNodes().awaitUninterruptibly().getNow();
List<ClusterNodeInfo> nodes = parse(nodesValue);
List<ClusterNodeInfo> nodes = parse(nodesValue);
for (ClusterNodeInfo clusterNodeInfo : nodes) { Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
String id = clusterNodeInfo.getNodeId(); for (ClusterNodeInfo clusterNodeInfo : nodes) {
if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) { String id = clusterNodeInfo.getNodeId();
id = clusterNodeInfo.getSlaveOf(); if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
} id = clusterNodeInfo.getSlaveOf();
ClusterPartition partition = partitions.get(id); }
if (partition == null) { ClusterPartition partition = partitions.get(id);
partition = new ClusterPartition(); if (partition == null) {
partitions.put(id, partition); partition = new ClusterPartition();
} partitions.put(id, partition);
}
if (clusterNodeInfo.getFlags().contains(Flag.FAIL)) { if (clusterNodeInfo.getFlags().contains(Flag.FAIL)) {
partition.setMasterFail(true); partition.setMasterFail(true);
} }
if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) { if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
partition.addSlaveAddress(clusterNodeInfo.getAddress()); partition.addSlaveAddress(clusterNodeInfo.getAddress());
} else { } else {
partition.setEndSlot(clusterNodeInfo.getEndSlot()); partition.setEndSlot(clusterNodeInfo.getEndSlot());
partition.setMasterAddress(clusterNodeInfo.getAddress()); partition.setMasterAddress(clusterNodeInfo.getAddress());
}
} }
}
for (ClusterPartition partition : partitions.values()) { for (ClusterPartition partition : partitions.values()) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
continue; continue;
} }
MasterSlaveServersConfig c = create(cfg); MasterSlaveServersConfig c = create(cfg);
log.info("master: {}", partition.getMasterAddress()); log.info("master: {}", partition.getMasterAddress());
c.setMasterAddress(partition.getMasterAddress()); c.setMasterAddress(partition.getMasterAddress());
// for (String slaveAddress : partition.getSlaveAddresses()) { // for (String slaveAddress : partition.getSlaveAddresses()) {
// log.info("slave: {}", slaveAddress); // log.info("slave: {}", slaveAddress);
// c.addSlaveAddress(slaveAddress); // c.addSlaveAddress(slaveAddress);
// } // }
SingleEntry entry = new SingleEntry(codec, group, c); SingleEntry entry = new SingleEntry(codec, group, c);
entries.put(partition.getEndSlot(), entry); entries.put(partition.getEndSlot(), entry);
} }
break;
client.shutdown(); } catch (RedisConnectionException e) {
break; log.warn(e.getMessage(), e);
} finally {
client.shutdown();
}
} }
this.config = create(cfg); this.config = create(cfg);

Loading…
Cancel
Save