|
|
|
@ -345,6 +345,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
scheduleClusterChangeCheck(cfg, null);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (!getShutdownLatch().acquire()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
final URI uri = iterator.next();
|
|
|
|
|
RFuture<RedisConnection> connectionFuture = connect(cfg, uri);
|
|
|
|
|
connectionFuture.addListener(new FutureListener<RedisConnection>() {
|
|
|
|
@ -352,6 +355,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
public void operationComplete(Future<RedisConnection> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
lastException.set(future.cause());
|
|
|
|
|
getShutdownLatch().release();
|
|
|
|
|
checkClusterState(cfg, iterator, lastException);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -370,6 +374,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
|
|
|
|
|
close(connection);
|
|
|
|
|
getShutdownLatch().release();
|
|
|
|
|
scheduleClusterChangeCheck(cfg, iterator);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -393,6 +398,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
checkSlotsMigration(newPartitions, nodesValue.toString());
|
|
|
|
|
checkSlotsChange(cfg, newPartitions, nodesValue.toString());
|
|
|
|
|
getShutdownLatch().release();
|
|
|
|
|
scheduleClusterChangeCheck(cfg, null);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|