|
|
|
@ -74,18 +74,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
try {
|
|
|
|
|
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
|
|
|
|
|
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);
|
|
|
|
|
|
|
|
|
|
log.debug("cluster nodes state from {} during startup:\n{}", connection.getRedisClient().getAddr(), nodesValue);
|
|
|
|
|
|
|
|
|
|
Collection<ClusterPartition> partitions = parsePartitions(nodesValue);
|
|
|
|
|
List<Future<Collection<Future<Void>>>> futures = new ArrayList<Future<Collection<Future<Void>>>>();
|
|
|
|
|
for (ClusterPartition partition : partitions) {
|
|
|
|
|
if (partition.isMasterFail()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
Future<Collection<Future<Void>>> masterFuture = addMasterEntry(partition, cfg);
|
|
|
|
|
futures.add(masterFuture);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (Future<Collection<Future<Void>>> masterFuture : futures) {
|
|
|
|
|
masterFuture.syncUninterruptibly();
|
|
|
|
|
masterFuture.awaitUninterruptibly();
|
|
|
|
|
if (!masterFuture.isSuccess()) {
|
|
|
|
|
log.error("Can't connect to master node.", masterFuture.cause());
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
for (Future<Void> future : masterFuture.getNow()) {
|
|
|
|
|
future.syncUninterruptibly();
|
|
|
|
|
future.awaitUninterruptibly();
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
log.error("Can't add nodes.", masterFuture.cause());
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
@ -102,7 +115,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
scheduleClusterChangeCheck(cfg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
|
|
|
|
|
RedisConnection connection = nodeConnections.get(addr);
|
|
|
|
|
if (connection != null) {
|
|
|
|
@ -181,7 +194,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Map<String, String>> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
log.error("Can't execute CLUSTER_INFO with " + connection.getRedisClient().getAddr(), future.cause());
|
|
|
|
|
log.error("Can't execute CLUSTER_INFO for " + connection.getRedisClient().getAddr(), future.cause());
|
|
|
|
|
result.setFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|