diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 7c52315f3..040b8bf4c 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -36,6 +36,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.decoder.MultiDecoder; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,15 +98,15 @@ public class CommandExecutorService implements CommandExecutor { }; - for (Integer slot : connectionManager.getEntries().keySet()) { - async(true, slot, null, connectionManager.getCodec(), command, params, promise, null, 0); + for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { + async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } public Future readRandomAsync(final RedisCommand command, final Object ... params) { final Promise mainPromise = connectionManager.newPromise(); - final List slots = new ArrayList(connectionManager.getEntries().keySet()); + final List slots = new ArrayList(connectionManager.getEntries().keySet()); Collections.shuffle(slots); retryReadRandomAsync(command, mainPromise, slots, params); @@ -113,7 +114,7 @@ public class CommandExecutorService implements CommandExecutor { } private void retryReadRandomAsync(final RedisCommand command, final Promise mainPromise, - final List slots, final Object... params) { + final List slots, final Object... params) { final Promise attemptPromise = connectionManager.newPromise(); attemptPromise.addListener(new FutureListener() { @Override @@ -134,8 +135,8 @@ public class CommandExecutorService implements CommandExecutor { } }); - Integer slot = slots.remove(0); - async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); + ClusterSlotRange slot = slots.remove(0); + async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); } public Future writeAllAsync(RedisCommand command, Object ... params) { @@ -171,8 +172,8 @@ public class CommandExecutorService implements CommandExecutor { return this; } }; - for (Integer slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, null, 0); + for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { + async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -182,7 +183,11 @@ public class CommandExecutorService implements CommandExecutor { if (future.isSuccess()) { return future.getNow(); } - throw future.cause() instanceof RedisException ? + throw convertException(future); + } + + private RedisException convertException(Future future) { + return future.cause() instanceof RedisException ? (RedisException) future.cause() : new RedisException("Unexpected exception while processing command", future.cause()); } @@ -349,8 +354,8 @@ public class CommandExecutorService implements CommandExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - for (Integer slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); + for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { + async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); } return mainPromise; } @@ -451,7 +456,7 @@ public class CommandExecutorService implements CommandExecutor { if (!connectionManager.getShutdownLatch().acquire()) { return; } - ex.set((RedisException)connFuture.cause()); + ex.set(convertException(connFuture)); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); return; } diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index bad467b4d..3fabae061 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.connection.ClusterConnectionManager; +import org.redisson.cluster.ClusterConnectionManager; import org.redisson.connection.ConnectionManager; import org.redisson.connection.ElasticacheConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager; diff --git a/src/main/java/org/redisson/RedissonKeys.java b/src/main/java/org/redisson/RedissonKeys.java index 63bb821d0..bca7a7c9b 100644 --- a/src/main/java/org/redisson/RedissonKeys.java +++ b/src/main/java/org/redisson/RedissonKeys.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.core.RKeys; import org.redisson.misc.CompositeIterable; @@ -43,11 +44,11 @@ public class RedissonKeys implements RKeys { @Override public Iterable getKeysByPattern(final String pattern) { List> iterables = new ArrayList>(); - for (final Integer slot : commandExecutor.getConnectionManager().getEntries().keySet()) { + for (final ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) { Iterable iterable = new Iterable() { @Override public Iterator iterator() { - return createKeysIterator(slot, pattern); + return createKeysIterator(slot.getStartSlot(), pattern); } }; iterables.add(iterable); @@ -58,11 +59,11 @@ public class RedissonKeys implements RKeys { @Override public Iterable getKeys() { List> iterables = new ArrayList>(); - for (final Integer slot : commandExecutor.getConnectionManager().getEntries().keySet()) { + for (final ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) { Iterable iterable = new Iterable() { @Override public Iterator iterator() { - return createKeysIterator(slot, null); + return createKeysIterator(slot.getStartSlot(), null); } }; iterables.add(iterable); diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 2ccb9412c..464290254 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -68,6 +68,10 @@ public class RedisConnection implements RedisCommands { return failAttempts; } + public boolean isActive() { + return channel.isActive(); + } + public void updateChannel(Channel channel) { this.channel = channel; channel.attr(CONNECTION).set(this); diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java similarity index 57% rename from src/main/java/org/redisson/connection/ClusterConnectionManager.java rename to src/main/java/org/redisson/cluster/ClusterConnectionManager.java index eeed60926..239c526b2 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -13,14 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.connection; - -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.ScheduledFuture; +package org.redisson.cluster; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,17 +34,23 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.protocol.RedisCommands; -import org.redisson.connection.ClusterNodeInfo.Flag; +import org.redisson.cluster.ClusterNodeInfo.Flag; +import org.redisson.connection.MasterSlaveConnectionManager; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.SingleEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.ScheduledFuture; + public class ClusterConnectionManager extends MasterSlaveConnectionManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final Map nodeConnections = new HashMap(); - private final Map lastPartitions = new HashMap(); + private final Map lastPartitions = new HashMap(); private ScheduledFuture monitorFuture; @@ -64,8 +68,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); - Map partitions = parsePartitions(nodesValue); - for (ClusterPartition partition : partitions.values()) { + Collection partitions = parsePartitions(nodesValue); + for (ClusterPartition partition : partitions) { addMasterEntry(partition, cfg); } @@ -98,7 +102,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { if (partition.isMasterFail()) { - log.warn("master: {} for slot range: {}-{} add failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); + log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges()); return; } @@ -108,18 +112,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } Map params = connection.sync(RedisCommands.CLUSTER_INFO); if ("fail".equals(params.get("cluster_state"))) { - log.warn("master: {} for slot range: {}-{} add failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); + log.warn("add master: {} for slot ranges: {} failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges()); return; } MasterSlaveServersConfig config = create(cfg); - log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); + log.info("added master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); config.setMasterAddress(partition.getMasterAddress()); - SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config); + SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - entries.put(partition.getEndSlot(), entry); - lastPartitions.put(partition.getEndSlot(), partition); + for (ClusterSlotRange slotRange : partition.getSlotRanges()) { + addMaster(slotRange, entry); + lastPartitions.put(slotRange, partition); + } } private void monitorClusterChange(final ClusterServersConfig cfg) { @@ -129,115 +135,141 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { try { for (URI addr : cfg.getNodeAddresses()) { RedisConnection connection = connect(cfg, addr); - String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); - - log.debug("cluster nodes state: {}", nodesValue); - - Map partitions = parsePartitions(nodesValue); - for (ClusterPartition newPart : partitions.values()) { - for (ClusterPartition part : lastPartitions.values()) { - if (newPart.getMasterAddress().equals(part.getMasterAddress())) { + if (connection == null) { + continue; + } - log.debug("found endslot {} for {} fail {}", part.getEndSlot(), part.getMasterAddress(), newPart.isMasterFail()); + String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); - if (newPart.isMasterFail()) { - ClusterPartition newMasterPart = partitions.get(part.getEndSlot()); - if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) { - log.info("changing master from {} to {} for {}", - part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot()); - URI newUri = newMasterPart.getMasterAddress(); - URI oldUri = part.getMasterAddress(); + log.debug("cluster nodes state: {}", nodesValue); - changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort()); - slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort()); + Collection newPartitions = parsePartitions(nodesValue); + checkMasterNodesChange(newPartitions); + checkSlotsChange(cfg, newPartitions); - part.setMasterAddress(newMasterPart.getMasterAddress()); - } - } - break; - } - } - } - - checkSlotsChange(cfg, partitions); - - break; + break; } - } catch (Exception e) { log.error(e.getMessage(), e); } } + }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); } - private void checkSlotsChange(ClusterServersConfig cfg, Map partitions) { - Set removeSlots = new HashSet(lastPartitions.keySet()); - removeSlots.removeAll(partitions.keySet()); - lastPartitions.keySet().removeAll(removeSlots); - if (!removeSlots.isEmpty()) { - log.info("{} slots found to remove", removeSlots.size()); + private Collection slots(Collection partitions) { + List result = new ArrayList(); + for (ClusterPartition clusterPartition : partitions) { + result.addAll(clusterPartition.getSlotRanges()); } + return result; + } - Map removeAddrs = new HashMap(); - for (Integer slot : removeSlots) { + private ClusterPartition find(Collection partitions, ClusterSlotRange slotRange) { + for (ClusterPartition clusterPartition : partitions) { + if (clusterPartition.getSlotRanges().contains(slotRange)) { + return clusterPartition; + } + } + return null; + } + + private void checkMasterNodesChange(Collection newPartitions) { + for (ClusterPartition newPart : newPartitions) { + for (ClusterPartition currentPart : lastPartitions.values()) { + if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { + continue; + } + // current master marked as failed + if (newPart.isMasterFail()) { + for (ClusterSlotRange currentSlotRange : currentPart.getSlotRanges()) { + ClusterPartition newMasterPart = find(newPartitions, currentSlotRange); + // does partition has a new master? + if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { + log.info("changing master from {} to {} for {}", + currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), currentSlotRange); + URI newUri = newMasterPart.getMasterAddress(); + URI oldUri = currentPart.getMasterAddress(); + + changeMaster(currentSlotRange, newUri.getHost(), newUri.getPort()); + slaveDown(currentSlotRange, oldUri.getHost(), oldUri.getPort()); + + currentPart.setMasterAddress(newMasterPart.getMasterAddress()); + } + } + } + break; + } + } + } + + private void checkSlotsChange(ClusterServersConfig cfg, Collection partitions) { + Collection partitionsSlots = slots(partitions); + Set removedSlots = new HashSet(lastPartitions.keySet()); + removedSlots.removeAll(partitionsSlots); + lastPartitions.keySet().removeAll(removedSlots); + if (!removedSlots.isEmpty()) { + log.info("{} slot ranges found to remove", removedSlots.size()); + } + + Map removeAddrs = new HashMap(); + for (ClusterSlotRange slot : removedSlots) { MasterSlaveEntry entry = removeMaster(slot); entry.shutdownMasterAsync(); removeAddrs.put(slot, entry); } - Set addSlots = new HashSet(partitions.keySet()); - addSlots.removeAll(lastPartitions.keySet()); - if (!addSlots.isEmpty()) { - log.info("{} slots found to add", addSlots.size()); + Set addedSlots = new HashSet(partitionsSlots); + addedSlots.removeAll(lastPartitions.keySet()); + if (!addedSlots.isEmpty()) { + log.info("{} slots found to add", addedSlots.size()); } - for (Integer slot : addSlots) { - ClusterPartition partition = partitions.get(slot); + for (ClusterSlotRange slot : addedSlots) { + ClusterPartition partition = find(partitions, slot); addMasterEntry(partition, cfg); } - for (Entry entry : removeAddrs.entrySet()) { + for (Entry entry : removeAddrs.entrySet()) { InetSocketAddress url = entry.getValue().getClient().getAddr(); slaveDown(entry.getKey(), url.getHostName(), url.getPort()); } } - private Map parsePartitions(String nodesValue) { + private Collection parsePartitions(String nodesValue) { Map partitions = new HashMap(); - Map result = new HashMap(); List nodes = parse(nodesValue); for (ClusterNodeInfo clusterNodeInfo : nodes) { - if (clusterNodeInfo.getFlags().contains(Flag.NOADDR)) { + if (clusterNodeInfo.containsFlag(Flag.NOADDR)) { // skip it continue; } String id = clusterNodeInfo.getNodeId(); - if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) { + if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { id = clusterNodeInfo.getSlaveOf(); } + + ClusterPartition partition = partitions.get(id); if (partition == null) { partition = new ClusterPartition(); partitions.put(id, partition); } - if (clusterNodeInfo.getFlags().contains(Flag.FAIL)) { + if (clusterNodeInfo.containsFlag(Flag.FAIL)) { partition.setMasterFail(true); } - if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) { + if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { partition.addSlaveAddress(clusterNodeInfo.getAddress()); } else { - partition.setStartSlot(clusterNodeInfo.getStartSlot()); - partition.setEndSlot(clusterNodeInfo.getEndSlot()); - result.put(clusterNodeInfo.getEndSlot(), partition); + partition.addSlotRanges(clusterNodeInfo.getSlotRanges()); partition.setMasterAddress(clusterNodeInfo.getAddress()); } } - return result; + return partitions.values(); } private MasterSlaveServersConfig create(ClusterServersConfig cfg) { @@ -285,15 +317,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (int i = 0; i < params.length - 8; i++) { String slots = params[i + 8]; String[] parts = slots.split("-"); - node = new ClusterNodeInfo(node); - node.setStartSlot(Integer.valueOf(parts[0])); - node.setEndSlot(Integer.valueOf(parts[1])); - nodes.add(node); + + if(parts.length == 1) { + node.addSlotRange(new ClusterSlotRange(Integer.valueOf(parts[0]), Integer.valueOf(parts[0]))); + } else if(parts.length == 2) { + node.addSlotRange(new ClusterSlotRange(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]))); + } } - } else { - nodes.add(node); } - + nodes.add(node); } return nodes; } diff --git a/src/main/java/org/redisson/connection/ClusterNodeInfo.java b/src/main/java/org/redisson/cluster/ClusterNodeInfo.java similarity index 64% rename from src/main/java/org/redisson/connection/ClusterNodeInfo.java rename to src/main/java/org/redisson/cluster/ClusterNodeInfo.java index 78ef12322..0f14390d1 100644 --- a/src/main/java/org/redisson/connection/ClusterNodeInfo.java +++ b/src/main/java/org/redisson/cluster/ClusterNodeInfo.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.connection; +package org.redisson.cluster; import java.net.URI; import java.util.ArrayList; @@ -27,23 +27,11 @@ public class ClusterNodeInfo { private String nodeId; private URI address; - private List flags = new ArrayList(); + private final List flags = new ArrayList(); private String slaveOf; - private int startSlot; - private int endSlot; + private final List slotRanges = new ArrayList(); - public ClusterNodeInfo() { - } - - public ClusterNodeInfo(ClusterNodeInfo info) { - this.nodeId = info.nodeId; - this.address = info.address; - this.flags = info.flags; - this.slaveOf = info.slaveOf; - this.startSlot = info.startSlot; - this.endSlot = info.endSlot; - } public String getNodeId() { return nodeId; } @@ -58,8 +46,15 @@ public class ClusterNodeInfo { this.address = URIBuilder.create(address); } - public List getFlags() { - return flags; + public void addSlotRange(ClusterSlotRange range) { + slotRanges.add(range); + } + public List getSlotRanges() { + return slotRanges; + } + + public boolean containsFlag(Flag flag) { + return flags.contains(flag); } public void addFlag(Flag flag) { this.flags.add(flag); @@ -72,25 +67,10 @@ public class ClusterNodeInfo { this.slaveOf = slaveOf; } - public int getStartSlot() { - return startSlot; - } - public void setStartSlot(int startSlot) { - this.startSlot = startSlot; - } - - public int getEndSlot() { - return endSlot; - } - public void setEndSlot(int endSlot) { - this.endSlot = endSlot; - } @Override public String toString() { return "ClusterNodeInfo [nodeId=" + nodeId + ", address=" + address + ", flags=" + flags - + ", slaveOf=" + slaveOf + ", startSlot=" + startSlot + ", endSlot=" + endSlot + "]"; + + ", slaveOf=" + slaveOf + ", slotRanges=" + slotRanges + "]"; } - - } diff --git a/src/main/java/org/redisson/connection/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java similarity index 80% rename from src/main/java/org/redisson/connection/ClusterPartition.java rename to src/main/java/org/redisson/cluster/ClusterPartition.java index 6618243dd..390fd29df 100644 --- a/src/main/java/org/redisson/connection/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.connection; +package org.redisson.cluster; import java.net.URI; import java.util.ArrayList; @@ -23,11 +23,10 @@ import org.redisson.misc.URIBuilder; public class ClusterPartition { - private int startSlot; - private int endSlot; private boolean masterFail; private URI masterAddress; private List slaveAddresses = new ArrayList(); + private final List slotRanges = new ArrayList(); public void setMasterFail(boolean masterFail) { this.masterFail = masterFail; @@ -36,18 +35,11 @@ public class ClusterPartition { return masterFail; } - public int getStartSlot() { - return startSlot; + public void addSlotRanges(List ranges) { + slotRanges.addAll(ranges); } - public void setStartSlot(int startSlot) { - this.startSlot = startSlot; - } - - public int getEndSlot() { - return endSlot; - } - public void setEndSlot(int endSlot) { - this.endSlot = endSlot; + public List getSlotRanges() { + return slotRanges; } public URI getMasterAddress() { diff --git a/src/main/java/org/redisson/cluster/ClusterSlotRange.java b/src/main/java/org/redisson/cluster/ClusterSlotRange.java new file mode 100644 index 000000000..b1198b780 --- /dev/null +++ b/src/main/java/org/redisson/cluster/ClusterSlotRange.java @@ -0,0 +1,73 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.cluster; + +public class ClusterSlotRange { + + private final int startSlot; + private final int endSlot; + + public ClusterSlotRange(int startSlot, int endSlot) { + super(); + this.startSlot = startSlot; + this.endSlot = endSlot; + } + + public int getStartSlot() { + return startSlot; + } + + public int getEndSlot() { + return endSlot; + } + + public boolean isOwn(int slot) { + return slot >= startSlot && slot <= endSlot; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + endSlot; + result = prime * result + startSlot; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ClusterSlotRange other = (ClusterSlotRange) obj; + if (endSlot != other.endSlot) + return false; + if (startSlot != other.startSlot) + return false; + return true; + } + + @Override + public String toString() { + return "[" + startSlot + "-" + endSlot + "]"; + } + + + +} diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 734f95c9b..30de50b68 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -16,6 +16,7 @@ package org.redisson.connection; import java.util.Collection; +import java.util.Map; import java.util.NavigableMap; import java.util.concurrent.TimeUnit; @@ -24,6 +25,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.misc.InfinitySemaphoreLatch; import io.netty.channel.EventLoopGroup; @@ -54,7 +56,7 @@ public interface ConnectionManager { Codec getCodec(); - NavigableMap getEntries(); + Map getEntries(); Promise newPromise(); diff --git a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java index c7a732733..4e52ed638 100644 --- a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java +++ b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java @@ -37,7 +37,7 @@ import io.netty.util.concurrent.ScheduledFuture; /** * {@link ConnectionManager} for AWS ElastiCache Replication Groups. By providing all nodes - * of the replication group to this manager, the role of each node can be polled to determine + * of the replication group to this manager, the role of each node can be polled to determine * if a failover has occurred resulting in a new master. * * @author Steve Ungerer @@ -45,15 +45,15 @@ import io.netty.util.concurrent.ScheduledFuture; public class ElasticacheConnectionManager extends MasterSlaveConnectionManager { private static final String ROLE_KEY = "role:"; - + private final Logger log = LoggerFactory.getLogger(getClass()); private AtomicReference currentMaster = new AtomicReference(); - + private final Map nodeConnections = new HashMap(); private ScheduledFuture monitorFuture; - + private enum Role { master, slave @@ -119,12 +119,12 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager { Role role = determineRole(replInfo); log.debug("node {} is {}", addr, role); - + if (Role.master.equals(role) && master.equals(addr)) { log.debug("Current master {} unchanged", master); } else if (Role.master.equals(role) && !master.equals(addr) && currentMaster.compareAndSet(master, addr)) { log.info("Master has changed from {} to {}", master, addr); - changeMaster(MAX_SLOT, addr.getHost(), addr.getPort()); + changeMaster(singleSlotRange, addr.getHost(), addr.getPort()); break; } } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9f61bf53f..288a9adf5 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -18,12 +18,10 @@ package org.redisson.connection; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Map.Entry; -import java.util.NavigableMap; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import org.redisson.Config; @@ -36,6 +34,7 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubType; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.misc.InfinitySemaphoreLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +62,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { static final int MAX_SLOT = 16384; + protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT); + private final Logger log = LoggerFactory.getLogger(getClass()); private HashedWheelTimer timer; @@ -78,13 +79,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveServersConfig config; - protected final NavigableMap entries = new ConcurrentSkipListMap(); + protected final Map entries = PlatformDependent.newConcurrentHashMap(); private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); - private final Set clients = Collections.newSetFromMap(new ConcurrentHashMap()); + private final Set clients = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); - MasterSlaveConnectionManager() { + protected MasterSlaveConnectionManager() { } @Override @@ -103,7 +104,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public NavigableMap getEntries() { + public Map getEntries() { return entries; } @@ -130,9 +131,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected void initEntry(MasterSlaveServersConfig config) { - MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config); + ClusterSlotRange range = new ClusterSlotRange(0, MAX_SLOT); + MasterSlaveEntry entry = new MasterSlaveEntry(Collections.singletonList(range), this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - entries.put(MAX_SLOT, entry); + addMaster(range, entry); } protected void init(Config cfg) { @@ -471,12 +473,21 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return entryCodec; } + protected MasterSlaveEntry getEntry(ClusterSlotRange slotRange) { + return entries.get(slotRange); + } + protected MasterSlaveEntry getEntry(int slot) { - return entries.ceilingEntry(slot).getValue(); + for (Entry entry : entries.entrySet()) { + if (entry.getKey().isOwn(slot)) { + return entry.getValue(); + } + } + return null; } - protected void slaveDown(int slot, String host, int port) { - Collection allPubSubConnections = getEntry(slot).slaveDown(host, port); + protected void slaveDown(ClusterSlotRange slotRange, String host, int port) { + Collection allPubSubConnections = getEntry(slotRange).slaveDown(host, port); // reattach listeners to other channels for (Entry mapEntry : name2PubSubConnection.entrySet()) { @@ -531,18 +542,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected void changeMaster(int endSlot, String host, int port) { - getEntry(endSlot).changeMaster(host, port); + protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { + getEntry(slotRange).changeMaster(host, port); + } + + protected void addMaster(ClusterSlotRange slotRange, MasterSlaveEntry entry) { + entries.put(slotRange, entry); } - protected MasterSlaveEntry removeMaster(int endSlot) { - return entries.remove(endSlot); + protected MasterSlaveEntry removeMaster(ClusterSlotRange slotRange) { + return entries.remove(slotRange); } @Override public Future connectionWriteOp(int slot) { MasterSlaveEntry e = getEntry(slot); - if (!e.isOwn(slot)) { + if (e == null) { throw new RedisEmptySlotException("No node for slot: " + slot, slot); } return e.connectionWriteOp(); @@ -551,7 +566,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Future connectionReadOp(int slot) { MasterSlaveEntry e = getEntry(slot); - if (!e.isOwn(slot)) { + if (e == null) { throw new RedisEmptySlotException("No node for slot: " + slot, slot); } return e.connectionReadOp(); @@ -560,7 +575,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Future connectionReadOp(int slot, RedisClient client) { MasterSlaveEntry e = getEntry(slot); - if (!e.isOwn(slot)) { + if (e == null) { throw new RedisEmptySlotException("No node for slot: " + slot, slot); } return e.connectionReadOp(client); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 9a0e67b0f..0eb686bbe 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -25,6 +25,7 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.misc.ConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,14 +48,11 @@ public class MasterSlaveEntry { final MasterSlaveServersConfig config; final ConnectionManager connectionManager; - final int startSlot; - final int endSlot; - final ConnectionPool writeConnectionHolder; + final List slotRanges; - public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) { - this.startSlot = startSlot; - this.endSlot = endSlot; + public MasterSlaveEntry(List slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + this.slotRanges = slotRanges; this.connectionManager = connectionManager; this.config = config; @@ -167,16 +165,8 @@ public class MasterSlaveEntry { slaveBalancer.shutdown(); } - public int getEndSlot() { - return endSlot; - } - - public int getStartSlot() { - return startSlot; - } - - public boolean isOwn(int slot) { - return slot >= startSlot && slot <= endSlot; + public List getSlotRanges() { + return slotRanges; } } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 55b727d76..74dddf378 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -234,7 +234,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { // to avoid freeze twice String addr = ip + ":" + port; if (freezeSlaves.putIfAbsent(addr, true) == null) { - slaveDown(0, ip, Integer.valueOf(port)); + slaveDown(singleSlotRange, ip, Integer.valueOf(port)); log.info("slave: {} has down", addr); } } @@ -269,7 +269,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String newMaster = ip + ":" + port; if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { - changeMaster(0, ip, Integer.valueOf(port)); + changeMaster(singleSlotRange, ip, Integer.valueOf(port)); log.info("master has changed from {} to {}", current, newMaster); } } diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index ae7ff578c..c71ae34f8 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -17,6 +17,7 @@ package org.redisson.connection; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -24,6 +25,7 @@ import org.redisson.Config; import org.redisson.MasterSlaveServersConfig; import org.redisson.SingleServerConfig; import org.redisson.client.RedisConnectionException; +import org.redisson.cluster.ClusterSlotRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,9 +71,9 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { @Override protected void initEntry(MasterSlaveServersConfig config) { - SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config); + SingleEntry entry = new SingleEntry(Collections.singletonList(singleSlotRange), this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - entries.put(MAX_SLOT, entry); + addMaster(singleSlotRange, entry); } private void monitorDnsChange(final SingleServerConfig cfg) { @@ -84,7 +86,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { if (!now.getHostAddress().equals(master.getHostAddress())) { log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress()); if (currentMaster.compareAndSet(master, now)) { - changeMaster(MAX_SLOT,cfg.getAddress().getHost(), cfg.getAddress().getPort()); + changeMaster(singleSlotRange, cfg.getAddress().getHost(), cfg.getAddress().getPort()); log.info("Master has been changed"); } } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 8b257ea98..924777480 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -15,10 +15,13 @@ */ package org.redisson.connection; +import java.util.List; + import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.misc.ConnectionPool; import org.redisson.misc.PubSubConnectionPoll; @@ -28,8 +31,8 @@ public class SingleEntry extends MasterSlaveEntry { final ConnectionPool pubSubConnectionHolder; - public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) { - super(startSlot, endSlot, connectionManager, config); + public SingleEntry(List slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { + super(slotRanges, connectionManager, config); pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup()); }