From 91e4d7e4e411a2c9724404882f69e626ae13f6f0 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 25 Jan 2016 14:21:28 +0300 Subject: [PATCH] Cluster slave discovery regression since 2.1.5 fixed --- .../cluster/ClusterConnectionManager.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 25e4a95e6..cbace80ab 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -99,7 +99,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Can't connect to servers!", lastException); } - monitorClusterChange(cfg); + scheduleClusterChangeCheck(cfg); } private Future connect(ClusterServersConfig cfg, final URI addr) { @@ -203,7 +203,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List> fs = e.initSlaveBalancer(config); futures.addAll(fs); - log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); + if (!partition.getSlaveAddresses().isEmpty()) { + log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); + } } Future f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); @@ -232,8 +234,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - private void monitorClusterChange(final ClusterServersConfig cfg) { - monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() { + private void scheduleClusterChangeCheck(final ClusterServersConfig cfg) { + monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() { @Override public void run() { List nodes = new ArrayList(); @@ -243,17 +245,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { nodes.add(partition.getMasterAddress()); slaves.addAll(partition.getSlaveAddresses()); } + // master nodes first nodes.addAll(slaves); checkClusterState(cfg, nodes.iterator(), lastException); } - }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); + }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); } private void checkClusterState(final ClusterServersConfig cfg, final Iterator iterator, final AtomicReference lastException) { if (!iterator.hasNext()) { log.error("Can't update cluster state", lastException.get()); + scheduleClusterChangeCheck(cfg); return; } URI uri = iterator.next(); @@ -263,6 +267,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { lastException.set(future.cause()); + System.out.println("Can't connect!!!!!"); checkClusterState(cfg, iterator, lastException); return; } @@ -280,6 +285,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause()); + scheduleClusterChangeCheck(cfg); return; } @@ -290,6 +296,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { checkMasterNodesChange(newPartitions); checkSlaveNodesChange(newPartitions); checkSlotsChange(cfg, newPartitions); + scheduleClusterChangeCheck(cfg); } }); } @@ -315,11 +322,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Set addedSlaves = new HashSet(newPart.getSlaveAddresses()); addedSlaves.removeAll(currentPart.getSlaveAddresses()); for (URI uri : addedSlaves) { - currentPart.addSlaveAddress(uri); + Future future = entry.addSlave(uri.getHost(), uri.getPort()); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't add slave: " + uri, future.cause()); + return; + } - entry.addSlave(uri.getHost(), uri.getPort()); - entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); - log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); + currentPart.addSlaveAddress(uri); + entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); + log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); + } + }); } break;