Cluster slave discovery regression since 2.1.5 fixed

pull/382/head
Nikita 9 years ago
parent e3cd3755b9
commit 91e4d7e4e4

@ -99,7 +99,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Can't connect to servers!", lastException);
}
monitorClusterChange(cfg);
scheduleClusterChangeCheck(cfg);
}
private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
@ -203,7 +203,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<Future<Void>> fs = e.initSlaveBalancer(config);
futures.addAll(fs);
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
if (!partition.getSlaveAddresses().isEmpty()) {
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
}
}
Future<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
@ -232,8 +234,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void monitorClusterChange(final ClusterServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
List<URI> nodes = new ArrayList<URI>();
@ -243,17 +245,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
nodes.add(partition.getMasterAddress());
slaves.addAll(partition.getSlaveAddresses());
}
// master nodes first
nodes.addAll(slaves);
checkClusterState(cfg, nodes.iterator(), lastException);
}
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get());
scheduleClusterChangeCheck(cfg);
return;
}
URI uri = iterator.next();
@ -263,6 +267,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
lastException.set(future.cause());
System.out.println("Can't connect!!!!!");
checkClusterState(cfg, iterator, lastException);
return;
}
@ -280,6 +285,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
scheduleClusterChangeCheck(cfg);
return;
}
@ -290,6 +296,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
checkMasterNodesChange(newPartitions);
checkSlaveNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions);
scheduleClusterChangeCheck(cfg);
}
});
}
@ -315,11 +322,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (URI uri : addedSlaves) {
currentPart.addSlaveAddress(uri);
Future<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + uri, future.cause());
return;
}
entry.addSlave(uri.getHost(), uri.getPort());
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});
}
break;

Loading…
Cancel
Save