From 8fd1c6f26e42f678b10c6ae1b6b8e51440fea4a6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 8 Jan 2015 11:56:19 +0300 Subject: [PATCH] Slot change discovery in cluster mode. #27 --- .../connection/ClusterConnectionManager.java | 117 +++++++++++------- .../redisson/connection/ClusterPartition.java | 8 ++ .../MasterSlaveConnectionManager.java | 4 + .../redisson/connection/MasterSlaveEntry.java | 9 ++ 4 files changed, 94 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/connection/ClusterConnectionManager.java index 95534cf78..46a68fd1c 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -18,12 +18,15 @@ package org.redisson.connection; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.ScheduledFuture; +import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.redisson.ClusterServersConfig; @@ -43,7 +46,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private final List nodeClients = new ArrayList(); - private Collection lastPartitions; + private final Map lastPartitions = new HashMap(); private ScheduledFuture monitorFuture; @@ -56,21 +59,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { RedisAsyncConnection connection = client.connectAsync(); String nodesValue = connection.clusterNodes().awaitUninterruptibly().getNow(); - Collection partitions = extractPartitions(nodesValue); - for (ClusterPartition partition : partitions) { + Map partitions = extractPartitions(nodesValue); + for (ClusterPartition partition : partitions.values()) { if (partition.isMasterFail()) { continue; } - MasterSlaveServersConfig c = create(cfg); - log.info("master: {}", partition.getMasterAddress()); - c.setMasterAddress(partition.getMasterAddress()); - - SingleEntry entry = new SingleEntry(codec, group, c); - entries.put(partition.getEndSlot(), entry); + addMasterEntry(partition, cfg); } - lastPartitions = partitions; break; } catch (RedisConnectionException e) { @@ -85,6 +82,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { monitorClusterChange(cfg); } + private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { + MasterSlaveServersConfig c = create(cfg); + log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); + c.setMasterAddress(partition.getMasterAddress()); + + SingleEntry entry = new SingleEntry(codec, group, c); + entries.put(partition.getEndSlot(), entry); + lastPartitions.put(partition.getEndSlot(), partition); + } + private void monitorClusterChange(final ClusterServersConfig cfg) { monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() { @Override @@ -96,40 +103,32 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { RedisAsyncConnection connection = client.connectAsync(); String nodesValue = connection.clusterNodes().awaitUninterruptibly().getNow(); - Collection partitions = extractPartitions(nodesValue); - for (ClusterPartition newPart : partitions) { - boolean found = false; - for (ClusterPartition part : lastPartitions) { + Map partitions = extractPartitions(nodesValue); + for (ClusterPartition newPart : partitions.values()) { + for (ClusterPartition part : lastPartitions.values()) { if (newPart.getMasterAddress().equals(part.getMasterAddress())) { - log.debug("found endslot {} for {} fail {}", newPart.getEndSlot(), newPart.getMasterAddress(), newPart.isMasterFail()); - found = true; - if (newPart.isMasterFail() && !part.isMasterFail()) { - for (ClusterPartition newMasterPart : partitions) { - if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress()) - && newMasterPart.getEndSlot() == part.getEndSlot()) { - - log.debug("changing master from {} to {} for {}", - part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot()); - URI newUri = toURI(newMasterPart.getMasterAddress()); - URI oldUri = toURI(part.getMasterAddress()); - - changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort()); - slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort()); - part.setMasterFail(true); - - monitorFuture.cancel(true); - } + log.debug("found endslot {} for {} fail {}", part.getEndSlot(), part.getMasterAddress(), newPart.isMasterFail()); + if (newPart.isMasterFail()) { + ClusterPartition newMasterPart = partitions.get(part.getEndSlot()); + if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) { + log.debug("changing master from {} to {} for {}", + part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot()); + URI newUri = toURI(newMasterPart.getMasterAddress()); + URI oldUri = toURI(part.getMasterAddress()); + + changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort()); + slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort()); + + part.setMasterAddress(newMasterPart.getMasterAddress()); } - } break; } } - if (!found) { - // TODO slot changed - } } + checkSlotsChange(cfg, partitions); + break; } catch (RedisConnectionException e) { @@ -144,11 +143,44 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } + }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); } - private Collection extractPartitions(String nodesValue) { + private void checkSlotsChange(ClusterServersConfig cfg, Map partitions) { + Set removeSlots = new HashSet(lastPartitions.keySet()); + removeSlots.removeAll(partitions.keySet()); + lastPartitions.keySet().removeAll(removeSlots); + if (!removeSlots.isEmpty()) { + log.info("{} slots found to remove", removeSlots.size()); + } + + Map removeAddrs = new HashMap(); + for (Integer slot : removeSlots) { + MasterSlaveEntry entry = removeMaster(slot); + entry.shutdownMasterAsync(); + removeAddrs.put(slot, entry); + } + + Set addSlots = new HashSet(partitions.keySet()); + addSlots.removeAll(lastPartitions.keySet()); + if (!addSlots.isEmpty()) { + log.info("{} slots found to add", addSlots.size()); + } + for (Integer slot : addSlots) { + ClusterPartition partition = partitions.get(slot); + addMasterEntry(partition, cfg); + } + + for (Entry entry : removeAddrs.entrySet()) { + InetSocketAddress url = entry.getValue().getClient().getAddr(); + slaveDown(entry.getKey(), url.getHostName(), url.getPort()); + } + } + + private Map extractPartitions(String nodesValue) { Map partitions = new HashMap(); + Map result = new HashMap(); List nodes = parse(nodesValue); for (ClusterNodeInfo clusterNodeInfo : nodes) { String id = clusterNodeInfo.getNodeId(); @@ -168,11 +200,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) { partition.addSlaveAddress(clusterNodeInfo.getAddress()); } else { + partition.setStartSlot(clusterNodeInfo.getStartSlot()); partition.setEndSlot(clusterNodeInfo.getEndSlot()); + result.put(clusterNodeInfo.getEndSlot(), partition); partition.setMasterAddress(clusterNodeInfo.getAddress()); } } - return partitions.values(); + return result; } private MasterSlaveServersConfig create(ClusterServersConfig cfg) { @@ -187,11 +221,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return c; } - public static void main(String[] args) { - String s = "FAIL?".replaceAll("\\?", ""); - System.out.println(s); - } - private List parse(String nodesResponse) { List nodes = new ArrayList(); for (String nodeInfo : nodesResponse.split("\n")) { diff --git a/src/main/java/org/redisson/connection/ClusterPartition.java b/src/main/java/org/redisson/connection/ClusterPartition.java index daca45c8f..73bbe091c 100644 --- a/src/main/java/org/redisson/connection/ClusterPartition.java +++ b/src/main/java/org/redisson/connection/ClusterPartition.java @@ -20,6 +20,7 @@ import java.util.List; public class ClusterPartition { + private int startSlot; private int endSlot; private boolean masterFail; private String masterAddress; @@ -32,6 +33,13 @@ public class ClusterPartition { return masterFail; } + public int getStartSlot() { + return startSlot; + } + public void setStartSlot(int startSlot) { + this.startSlot = startSlot; + } + public int getEndSlot() { return endSlot; } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index b40296057..60f476881 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -625,6 +625,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { getEntry(endSlot).changeMaster(host, port); } + protected MasterSlaveEntry removeMaster(int endSlot) { + return entries.remove(endSlot); + } + protected RedisConnection connectionWriteOp(int slot) { return getEntry(slot).connectionWriteOp(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index e19aa4c9e..57ca25f9b 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -91,6 +91,10 @@ public class MasterSlaveEntry { this.config.getSlaveSubscriptionConnectionPoolSize())); } + public RedisClient getClient() { + return masterEntry.getClient(); + } + public void slaveUp(String host, int port) { slaveBalancer.unfreeze(host, port); } @@ -108,6 +112,11 @@ public class MasterSlaveEntry { oldMaster.getClient().shutdown(); } + public void shutdownMasterAsync() { + masterEntry.getClient().shutdownAsync(); + slaveBalancer.shutdown(); + } + public RedisConnection connectionWriteOp() { acquireMasterConnection();