From 070fff876af82fc3ad7d3cf7503933fa89a5561c Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 26 Oct 2015 20:45:42 +0300 Subject: [PATCH] Slot changes discovery optimization. #264 --- .../org/redisson/client/RedisConnection.java | 2 +- .../client/RedisTimeoutException.java | 7 ++++ .../cluster/ClusterConnectionManager.java | 32 +++++++++++++------ .../redisson/cluster/ClusterPartition.java | 11 +++++-- .../connection/ConnectionManager.java | 1 - .../MasterSlaveConnectionManager.java | 7 ++-- .../redisson/connection/MasterSlaveEntry.java | 22 +++++++++++-- .../connection/SingleConnectionManager.java | 5 ++- .../org/redisson/connection/SingleEntry.java | 4 +-- 9 files changed, 69 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 464290254..5cd9ac5a4 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -85,7 +85,7 @@ public class RedisConnection implements RedisCommands { public R await(Future cmd) { if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { Promise promise = (Promise)cmd; - RedisTimeoutException ex = new RedisTimeoutException(); + RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); promise.setFailure(ex); throw ex; } diff --git a/src/main/java/org/redisson/client/RedisTimeoutException.java b/src/main/java/org/redisson/client/RedisTimeoutException.java index 3e69d5f60..f91f446cb 100644 --- a/src/main/java/org/redisson/client/RedisTimeoutException.java +++ b/src/main/java/org/redisson/client/RedisTimeoutException.java @@ -19,4 +19,11 @@ public class RedisTimeoutException extends RedisException { private static final long serialVersionUID = -8418769175260962404L; + public RedisTimeoutException() { + } + + public RedisTimeoutException(String message) { + super(message); + } + } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 239c526b2..458a1071c 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -135,13 +135,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { try { for (URI addr : cfg.getNodeAddresses()) { RedisConnection connection = connect(cfg, addr); - if (connection == null) { + if (connection == null || !connection.isActive()) { continue; } String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); - log.debug("cluster nodes state: {}", nodesValue); + log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); Collection newPartitions = parsePartitions(nodesValue); checkMasterNodesChange(newPartitions); @@ -217,8 +217,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Map removeAddrs = new HashMap(); for (ClusterSlotRange slot : removedSlots) { MasterSlaveEntry entry = removeMaster(slot); - entry.shutdownMasterAsync(); - removeAddrs.put(slot, entry); + entry.removeSlotRange(slot); + if (entry.getSlotRanges().isEmpty()) { + entry.shutdownMasterAsync(); + removeAddrs.put(slot, entry); + } + } + for (Entry entry : removeAddrs.entrySet()) { + InetSocketAddress url = entry.getValue().getClient().getAddr(); + slaveDown(entry.getKey(), url.getHostName(), url.getPort()); } Set addedSlots = new HashSet(partitionsSlots); @@ -228,13 +235,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } for (ClusterSlotRange slot : addedSlots) { ClusterPartition partition = find(partitions, slot); - addMasterEntry(partition, cfg); + boolean masterFound = false; + for (MasterSlaveEntry entry : getEntries().values()) { + if (entry.getClient().getAddr().equals(partition.getMasterAddr())) { + addMaster(slot, entry); + lastPartitions.put(slot, partition); + masterFound = true; + break; + } + } + if (!masterFound) { + addMasterEntry(partition, cfg); + } } - for (Entry entry : removeAddrs.entrySet()) { - InetSocketAddress url = entry.getValue().getClient().getAddr(); - slaveDown(entry.getKey(), url.getHostName(), url.getPort()); - } } private Collection parsePartitions(String nodesValue) { diff --git a/src/main/java/org/redisson/cluster/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java index 390fd29df..7e711f0be 100644 --- a/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -15,9 +15,12 @@ */ package org.redisson.cluster; +import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.redisson.misc.URIBuilder; @@ -26,7 +29,7 @@ public class ClusterPartition { private boolean masterFail; private URI masterAddress; private List slaveAddresses = new ArrayList(); - private final List slotRanges = new ArrayList(); + private final Set slotRanges = new HashSet(); public void setMasterFail(boolean masterFail) { this.masterFail = masterFail; @@ -38,10 +41,14 @@ public class ClusterPartition { public void addSlotRanges(List ranges) { slotRanges.addAll(ranges); } - public List getSlotRanges() { + public Set getSlotRanges() { return slotRanges; } + public InetSocketAddress getMasterAddr() { + return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort()); + } + public URI getMasterAddress() { return masterAddress; } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 30de50b68..cd6417a38 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -17,7 +17,6 @@ package org.redisson.connection; import java.util.Collection; import java.util.Map; -import java.util.NavigableMap; import java.util.concurrent.TimeUnit; import org.redisson.MasterSlaveServersConfig; diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 288a9adf5..51364005d 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -131,10 +131,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected void initEntry(MasterSlaveServersConfig config) { - ClusterSlotRange range = new ClusterSlotRange(0, MAX_SLOT); - MasterSlaveEntry entry = new MasterSlaveEntry(Collections.singletonList(range), this, config); + HashSet slots = new HashSet(); + slots.add(singleSlotRange); + MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - addMaster(range, entry); + addMaster(singleSlotRange, entry); } protected void init(Config cfg) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 0eb686bbe..c79311dcf 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -20,6 +20,8 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; @@ -49,9 +51,11 @@ public class MasterSlaveEntry { final ConnectionManager connectionManager; final ConnectionPool writeConnectionHolder; - final List slotRanges; + final Set slotRanges; - public MasterSlaveEntry(List slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + final AtomicBoolean active = new AtomicBoolean(true); + + public MasterSlaveEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { this.slotRanges = slotRanges; this.connectionManager = connectionManager; this.config = config; @@ -127,6 +131,10 @@ public class MasterSlaveEntry { } public void shutdownMasterAsync() { + if (!active.compareAndSet(true, false)) { + return; + } + connectionManager.shutdownAsync(masterEntry.getClient()); slaveBalancer.shutdownAsync(); } @@ -161,11 +169,19 @@ public class MasterSlaveEntry { } public void shutdown() { + if (!active.compareAndSet(true, false)) { + return; + } + masterEntry.getClient().shutdown(); slaveBalancer.shutdown(); } - public List getSlotRanges() { + public void removeSlotRange(ClusterSlotRange range) { + slotRanges.remove(range); + } + + public Set getSlotRanges() { return slotRanges; } diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index c71ae34f8..3c8e49407 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -18,6 +18,7 @@ package org.redisson.connection; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; +import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -71,7 +72,9 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { @Override protected void initEntry(MasterSlaveServersConfig config) { - SingleEntry entry = new SingleEntry(Collections.singletonList(singleSlotRange), this, config); + HashSet slots = new HashSet(); + slots.add(singleSlotRange); + SingleEntry entry = new SingleEntry(slots, this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); addMaster(singleSlotRange, entry); } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 924777480..b5095f6f3 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -15,7 +15,7 @@ */ package org.redisson.connection; -import java.util.List; +import java.util.Set; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; @@ -31,7 +31,7 @@ public class SingleEntry extends MasterSlaveEntry { final ConnectionPool pubSubConnectionHolder; - public SingleEntry(List slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + public SingleEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(slotRanges, connectionManager, config); pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup()); }