From 6563a03c4ed601ff32e1d777e11a4c0f23fbd031 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 30 Oct 2015 21:15:44 +0300 Subject: [PATCH] Connect to nodes during cluster scan from Cluster Nodes info, not from initial config. #268 --- .../cluster/ClusterConnectionManager.java | 41 +++++++++++-------- .../redisson/cluster/ClusterPartition.java | 8 ++++ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 76167546f..1b22012b9 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -92,6 +92,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } catch (Exception e) { log.error(e.getMessage(), e); } + if (!connection.isActive()) { + log.warn("connection for {} is not active!", connection.getRedisClient().getAddr()); + connection.closeAsync(); + connection = null; + } + if (connection == null) { + nodeConnections.remove(addr); + } return connection; } @@ -132,30 +140,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { @Override public void run() { try { - for (URI addr : cfg.getNodeAddresses()) { - RedisConnection connection = connect(cfg, addr); - if (connection == null || !connection.isActive()) { - continue; + for (ClusterPartition partition : lastPartitions.values()) { + for (URI uri : partition.getAllAddresses()) { + RedisConnection connection = connect(cfg, uri); + if (connection == null) { + continue; + } + + updateClusterState(cfg, connection); + return; } - - String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); - - log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); - - Collection newPartitions = parsePartitions(nodesValue); - checkMasterNodesChange(newPartitions); - checkSlotsChange(cfg, newPartitions); - - break; } } catch (Exception e) { log.error(e.getMessage(), e); } - } + }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); + } + private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection) { + String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); + log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); - }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); + Collection newPartitions = parsePartitions(nodesValue); + checkMasterNodesChange(newPartitions); + checkSlotsChange(cfg, newPartitions); } private Collection slots(Collection partitions) { diff --git a/src/main/java/org/redisson/cluster/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java index 94faab052..e6418ec19 100644 --- a/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -69,6 +70,13 @@ public class ClusterPartition { this.masterAddress = masterAddress; } + public Set getAllAddresses() { + Set result = new LinkedHashSet(); + result.add(masterAddress); + result.addAll(slaveAddresses); + return result; + } + public void addSlaveAddress(URI address) { slaveAddresses.add(address); }