From cb8f53f9c64c3284cef1f8b6621f6db848a0db81 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 7 Jul 2016 13:25:57 +0300 Subject: [PATCH] Cluster slot changes discovery fixed --- src/main/java/org/redisson/RedissonKeys.java | 44 ++-- .../client/protocol/RedisCommands.java | 2 + .../cluster/ClusterConnectionManager.java | 224 ++++++++++++------ .../redisson/cluster/ClusterPartition.java | 48 ++++ .../command/CommandAsyncExecutor.java | 11 +- .../redisson/command/CommandAsyncService.java | 73 ++++-- .../redisson/command/CommandBatchService.java | 11 +- .../command/CommandReactiveExecutor.java | 3 +- .../command/CommandReactiveService.java | 5 +- .../connection/ConnectionManager.java | 9 +- .../ElasticacheConnectionManager.java | 2 +- .../MasterSlaveConnectionManager.java | 95 ++++---- .../redisson/connection/MasterSlaveEntry.java | 21 +- .../org/redisson/connection/NodeSource.java | 15 ++ .../connection/SentinelConnectionManager.java | 16 +- .../connection/SingleConnectionManager.java | 2 +- .../reactive/RedissonKeysReactive.java | 16 +- 17 files changed, 390 insertions(+), 207 deletions(-) diff --git a/src/main/java/org/redisson/RedissonKeys.java b/src/main/java/org/redisson/RedissonKeys.java index d208533b5..3feeb9d9c 100644 --- a/src/main/java/org/redisson/RedissonKeys.java +++ b/src/main/java/org/redisson/RedissonKeys.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -31,9 +32,9 @@ import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.cluster.ClusterSlotRange; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.core.RKeys; import org.redisson.core.RType; import org.redisson.misc.CompositeIterable; @@ -78,11 +79,11 @@ public class RedissonKeys implements RKeys { public Iterable getKeysByPattern(final String pattern, final int count) { List> iterables = new ArrayList>(); - for (final ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) { + for (final MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { Iterable iterable = new Iterable() { @Override public Iterator iterator() { - return createKeysIterator(slot.getStartSlot(), pattern, count); + return createKeysIterator(entry, pattern, count); } }; iterables.add(iterable); @@ -96,21 +97,21 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null); } - private ListScanResult scanIterator(InetSocketAddress client, int slot, long startPos, String pattern, int count) { + private ListScanResult scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { - Future> f = commandExecutor.readAsync(client, slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); + Future> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); return commandExecutor.get(f); } - Future> f = commandExecutor.readAsync(client, slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); + Future> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); return commandExecutor.get(f); } - private Iterator createKeysIterator(final int slot, final String pattern, final int count) { + private Iterator createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) { return new RedissonBaseIterator() { @Override ListScanResult iterator(InetSocketAddress client, long nextIterPos) { - return RedissonKeys.this.scanIterator(client, slot, nextIterPos, pattern, count); + return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); } @Override @@ -160,7 +161,8 @@ public class RedissonKeys implements RKeys { final Promise result = commandExecutor.getConnectionManager().newPromise(); final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); - final AtomicLong executed = new AtomicLong(commandExecutor.getConnectionManager().getEntries().size()); + Set entries = commandExecutor.getConnectionManager().getEntrySet(); + final AtomicLong executed = new AtomicLong(entries.size()); final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -174,8 +176,8 @@ public class RedissonKeys implements RKeys { } }; - for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) { - Future> findFuture = commandExecutor.readAsync(slot.getStartSlot(), null, RedisCommands.KEYS, pattern); + for (MasterSlaveEntry entry : entries) { + Future> findFuture = commandExecutor.readAsync(entry, null, RedisCommands.KEYS, pattern); findFuture.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { @@ -211,18 +213,16 @@ public class RedissonKeys implements RKeys { return commandExecutor.writeAsync(null, RedisCommands.DEL, keys); } - Map> range2key = new HashMap>(); + Map> range2key = new HashMap>(); for (String key : keys) { int slot = commandExecutor.getConnectionManager().calcSlot(key); - for (ClusterSlotRange range : commandExecutor.getConnectionManager().getEntries().keySet()) { - if (range.isOwn(slot)) { - List list = range2key.get(range); - if (list == null) { - list = new ArrayList(); - range2key.put(range, list); - } - list.add(key); + for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { + List list = range2key.get(entry); + if (list == null) { + list = new ArrayList(); + range2key.put(entry, list); } + list.add(key); } } @@ -246,11 +246,11 @@ public class RedissonKeys implements RKeys { } }; - for (Entry> entry : range2key.entrySet()) { + for (Entry> entry : range2key.entrySet()) { // executes in batch due to CROSSLOT error CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (String key : entry.getValue()) { - executorService.writeAsync(entry.getKey().getStartSlot(), null, RedisCommands.DEL, key); + executorService.writeAsync(entry.getKey(), null, RedisCommands.DEL, key); } Future> future = executorService.executeAsync(); diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index ef28fb048..3ebcd4b1d 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -267,9 +267,11 @@ public interface RedisCommands { RedisStrictCommand CLUSTER_REPLICATE = new RedisStrictCommand("CLUSTER", "REPLICATE"); RedisStrictCommand CLUSTER_FORGET = new RedisStrictCommand("CLUSTER", "FORGET"); + RedisStrictCommand CLUSTER_RESET = new RedisStrictCommand("CLUSTER", "RESET"); RedisStrictCommand> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand>("CLUSTER", "GETKEYSINSLOT", new StringListReplayDecoder()); RedisStrictCommand CLUSTER_SETSLOT = new RedisStrictCommand("CLUSTER", "SETSLOT"); RedisStrictCommand CLUSTER_MEET = new RedisStrictCommand("CLUSTER", "MEET"); + RedisStrictCommand> INFO_KEYSPACE = new RedisStrictCommand>("INFO", "KEYSPACE", new StringMapDataDecoder()); RedisStrictCommand> INFO_CLUSTER = new RedisStrictCommand>("INFO", "CLUSTER", new StringMapDataDecoder()); RedisStrictCommand INFO_REPLICATION = new RedisStrictCommand("INFO", "replication", new StringDataDecoder()); RedisStrictCommand> INFO_PERSISTENCE = new RedisStrictCommand>("INFO", "persistence", new StringMapDataDecoder()); diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 5fe6b97af..8bb82b33b 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -23,8 +23,11 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.ClusterServersConfig; @@ -50,14 +53,15 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.internal.PlatformDependent; public class ClusterConnectionManager extends MasterSlaveConnectionManager { private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map nodeConnections = new HashMap(); + private final Map nodeConnections = PlatformDependent.newConcurrentHashMap(); - private final Map lastPartitions = new HashMap(); + private final Map lastPartitions = PlatformDependent.newConcurrentHashMap(); private ScheduledFuture monitorFuture; @@ -96,13 +100,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (Future>> masterFuture : futures) { masterFuture.awaitUninterruptibly(); if (!masterFuture.isSuccess()) { - log.error("Can't connect to master node.", masterFuture.cause()); continue; } for (Future future : masterFuture.getNow()) { future.awaitUninterruptibly(); if (!future.isSuccess()) { - log.error("Can't add nodes.", masterFuture.cause()); continue; } } @@ -119,6 +121,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Can't connect to servers!", lastException); } + if (lastPartitions.size() != MAX_SLOT) { + stopThreads(); + throw new RedisConnectionException("Not all slots are covered! Only " + lastPartitions.size() + " slots are avaliable", lastException); + } + scheduleClusterChangeCheck(cfg, null); } @@ -195,6 +202,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); result.setFailure(future.cause()); return; } @@ -216,6 +224,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { RedisException e = new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges() + ". Reason - cluster_state:fail"); + log.error("cluster_state:fail for " + connection.getRedisClient().getAddr()); result.setFailure(e); return; } @@ -243,21 +252,25 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } Future f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + final Promise initFuture = newPromise(); + futures.add(initFuture); f.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + log.error("Can't add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + initFuture.setFailure(future.cause()); return; } - for (ClusterSlotRange slotRange : partition.getSlotRanges()) { - addEntry(slotRange, e); - lastPartitions.put(slotRange, partition); + for (Integer slot : partition.getSlots()) { + addEntry(slot, e); + lastPartitions.put(slot, partition); } log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); + initFuture.setSuccess(null); } }); - futures.add(f); result.setSuccess(futures); } }); @@ -277,7 +290,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (nodesIterator == null) { List nodes = new ArrayList(); List slaves = new ArrayList(); - for (ClusterPartition partition : lastPartitions.values()) { + if (lastPartitions.isEmpty()) { + System.out.println("lastPartitions.isEmpty()"); + } + + for (ClusterPartition partition : getLastPartitions()) { if (!partition.isMasterFail()) { nodes.add(partition.getMasterAddress()); } @@ -334,32 +351,38 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } List nodes = future.getNow(); + final StringBuilder nodesValue = new StringBuilder(); if (log.isDebugEnabled()) { - StringBuilder nodesValue = new StringBuilder(); for (ClusterNodeInfo clusterNodeInfo : nodes) { nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n"); } log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); } - Collection newPartitions = parsePartitions(nodes); - checkMasterNodesChange(newPartitions); + final Collection newPartitions = parsePartitions(nodes); + Future masterFuture = checkMasterNodesChange(cfg, newPartitions); checkSlaveNodesChange(newPartitions); - checkSlotsChange(cfg, newPartitions); - scheduleClusterChangeCheck(cfg, null); + masterFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + checkSlotsMigration(newPartitions, nodesValue.toString()); + checkSlotsChange(cfg, newPartitions, nodesValue.toString()); + scheduleClusterChangeCheck(cfg, null); + } + }); } }); } private void checkSlaveNodesChange(Collection newPartitions) { for (ClusterPartition newPart : newPartitions) { - for (ClusterPartition currentPart : lastPartitions.values()) { + for (ClusterPartition currentPart : getLastPartitions()) { if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { continue; } MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); - // should be invoked first in order to removed stale failedSlaveAddresses + // should be invoked first in order to remove stale failedSlaveAddresses addRemoveSlaves(entry, currentPart, newPart); // Does some slaves change failed state to alive? upDownSlaves(entry, currentPart, newPart); @@ -421,63 +444,109 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } - private Collection slots(Collection partitions) { - List result = new ArrayList(); + private Collection slots(Collection partitions) { + Set result = new HashSet(MAX_SLOT); for (ClusterPartition clusterPartition : partitions) { - result.addAll(clusterPartition.getSlotRanges()); + result.addAll(clusterPartition.getSlots()); } return result; } - private ClusterPartition find(Collection partitions, ClusterSlotRange slotRange) { + private ClusterPartition find(Collection partitions, Integer slot) { for (ClusterPartition clusterPartition : partitions) { - if (clusterPartition.getSlotRanges().contains(slotRange)) { - return clusterPartition; + for (ClusterSlotRange slotRange : clusterPartition.getSlotRanges()) { + if (slotRange.isOwn(slot)) { + return clusterPartition; + } } } return null; } - private void checkMasterNodesChange(Collection newPartitions) { - for (ClusterPartition newPart : newPartitions) { - for (ClusterPartition currentPart : lastPartitions.values()) { + private Future checkMasterNodesChange(ClusterServersConfig cfg, Collection newPartitions) { + List newMasters = new ArrayList(); + for (final ClusterPartition newPart : newPartitions) { + boolean masterFound = false; + for (ClusterPartition currentPart : getLastPartitions()) { if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { continue; } + masterFound = true; // current master marked as failed - if (newPart.isMasterFail()) { - for (ClusterSlotRange currentSlotRange : currentPart.getSlotRanges()) { - ClusterPartition newMasterPart = find(newPartitions, currentSlotRange); - // does partition has a new master? - if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { - log.info("changing master from {} to {} for {}", - currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), currentSlotRange); - URI newUri = newMasterPart.getMasterAddress(); - URI oldUri = currentPart.getMasterAddress(); - - changeMaster(currentSlotRange, newUri.getHost(), newUri.getPort()); - - currentPart.setMasterAddress(newMasterPart.getMasterAddress()); - } + if (!newPart.isMasterFail()) { + continue; + } + for (Integer slot : currentPart.getSlots()) { + ClusterPartition newMasterPart = find(newPartitions, slot); + // does partition has a new master? + if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { + log.info("changing master from {} to {} for {}", + currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot); + URI newUri = newMasterPart.getMasterAddress(); + URI oldUri = currentPart.getMasterAddress(); + + changeMaster(slot, newUri.getHost(), newUri.getPort()); + + currentPart.setMasterAddress(newMasterPart.getMasterAddress()); } } break; } + + if (!masterFound && !newPart.getSlotRanges().isEmpty()) { + newMasters.add(newPart); + } + } + + if (newMasters.isEmpty()) { + return newSucceededFuture(null); } + + final Promise result = newPromise(); + final AtomicInteger masters = new AtomicInteger(newMasters.size()); + final Queue> futures = new ConcurrentLinkedQueue>(); + for (ClusterPartition newPart : newMasters) { + Future>> future = addMasterEntry(newPart, cfg); + future.addListener(new FutureListener>>() { + @Override + public void operationComplete(Future>> future) throws Exception { + if (future.isSuccess()) { + futures.addAll(future.getNow()); + } + + if (masters.decrementAndGet() == 0) { + final AtomicInteger nodes = new AtomicInteger(futures.size()); + for (Future nodeFuture : futures) { + nodeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (nodes.decrementAndGet() == 0) { + result.setSuccess(null); + } + } + }); + } + } + } + }); + } + return result; } - private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions) { - checkSlotsMigration(newPartitions); + private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions, String nodes) { + Collection newPartitionsSlots = slots(newPartitions); + if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) { + return; + } - Collection newPartitionsSlots = slots(newPartitions); - Set removedSlots = new HashSet(lastPartitions.keySet()); + Set removedSlots = new HashSet(lastPartitions.keySet()); removedSlots.removeAll(newPartitionsSlots); lastPartitions.keySet().removeAll(removedSlots); if (!removedSlots.isEmpty()) { - log.info("{} slot ranges found to remove", removedSlots); + log.info("{} slots found to remove", removedSlots.size()); } - for (ClusterSlotRange slot : removedSlots) { + for (Integer slot : removedSlots) { MasterSlaveEntry entry = removeMaster(slot); entry.removeSlotRange(slot); if (entry.getSlotRanges().isEmpty()) { @@ -487,38 +556,25 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } - Set addedSlots = new HashSet(newPartitionsSlots); + Set addedSlots = new HashSet(newPartitionsSlots); addedSlots.removeAll(lastPartitions.keySet()); if (!addedSlots.isEmpty()) { - log.info("{} slots found to add", addedSlots); + log.info("{} slots found to add", addedSlots.size()); } - for (final ClusterSlotRange slot : addedSlots) { + for (final Integer slot : addedSlots) { ClusterPartition partition = find(newPartitions, slot); - boolean masterFound = false; - for (MasterSlaveEntry entry : getEntries().values()) { + for (MasterSlaveEntry entry : getEntrySet()) { if (entry.getClient().getAddr().equals(partition.getMasterAddr())) { addEntry(slot, entry); lastPartitions.put(slot, partition); - masterFound = true; break; } } - if (!masterFound) { - Future>> future = addMasterEntry(partition, cfg); - future.addListener(new FutureListener>>() { - @Override - public void operationComplete(Future>> future) throws Exception { - if (!future.isSuccess()) { - log.error("New cluster slot range " + slot + " without master node detected", future.cause()); - } - } - }); - } } } - private void checkSlotsMigration(Collection newPartitions) { - List currentPartitions = new ArrayList(lastPartitions.values()); + private void checkSlotsMigration(Collection newPartitions, String nodes) { + Set currentPartitions = getLastPartitions(); for (ClusterPartition currentPartition : currentPartitions) { for (ClusterPartition newPartition : newPartitions) { if (!currentPartition.getNodeId().equals(newPartition.getNodeId()) @@ -527,27 +583,35 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - Set addedSlots = new HashSet(newPartition.getSlotRanges()); - addedSlots.removeAll(currentPartition.getSlotRanges()); - MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next()); - currentPartition.addSlotRanges(addedSlots); - for (ClusterSlotRange slot : addedSlots) { + Set addedSlots = new HashSet(newPartition.getSlots()); + addedSlots.removeAll(currentPartition.getSlots()); + currentPartition.addSlots(addedSlots); + + MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); + + for (Integer slot : addedSlots) { entry.addSlotRange(slot); addEntry(slot, entry); - log.info("{} slot added for {}", slot, entry.getClient().getAddr()); lastPartitions.put(slot, currentPartition); } + if (!addedSlots.isEmpty()) { + log.info("{} slots added to {}", addedSlots.size(), entry.getClient().getAddr()); + } - Set removedSlots = new HashSet(currentPartition.getSlotRanges()); - removedSlots.removeAll(newPartition.getSlotRanges()); - lastPartitions.keySet().removeAll(removedSlots); - currentPartition.removeSlotRanges(removedSlots); + Set removedSlots = new HashSet(currentPartition.getSlots()); + removedSlots.removeAll(newPartition.getSlots()); + for (Integer removeSlot : removedSlots) { + if (lastPartitions.remove(removeSlot, currentPartition)) { + entry.removeSlotRange(removeSlot); + removeMaster(removeSlot); + } + } + currentPartition.removeSlots(removedSlots); - for (ClusterSlotRange slot : removedSlots) { - log.info("{} slot removed for {}", slot, entry.getClient().getAddr()); - entry.removeSlotRange(slot); - removeMaster(slot); + if (!removedSlots.isEmpty()) { + log.info("{} slots removed from {}", removedSlots.size(), entry.getClient().getAddr()); } + break; } } } @@ -616,5 +680,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { connection.getRedisClient().shutdown(); } } + + private HashSet getLastPartitions() { + return new HashSet(lastPartitions.values()); + } } diff --git a/src/main/java/org/redisson/cluster/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java index 5af6037ff..ddfa42b7b 100644 --- a/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -30,6 +30,8 @@ public class ClusterPartition { private URI masterAddress; private final Set slaveAddresses = new HashSet(); private final Set failedSlaves = new HashSet(); + + private final Set slots = new HashSet(); private final Set slotRanges = new HashSet(); public ClusterPartition(String nodeId) { @@ -48,15 +50,36 @@ public class ClusterPartition { return masterFail; } + public void addSlots(Set slots) { + this.slots.addAll(slots); + } + + public void removeSlots(Set slots) { + this.slots.removeAll(slots); + } + public void addSlotRanges(Set ranges) { + for (ClusterSlotRange clusterSlotRange : ranges) { + for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { + slots.add(i); + } + } slotRanges.addAll(ranges); } public void removeSlotRanges(Set ranges) { + for (ClusterSlotRange clusterSlotRange : ranges) { + for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { + slots.remove(i); + } + } slotRanges.removeAll(ranges); } public Set getSlotRanges() { return slotRanges; } + public Set getSlots() { + return slots; + } public InetSocketAddress getMasterAddr() { return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort()); @@ -92,6 +115,31 @@ public class ClusterPartition { slaveAddresses.remove(uri); failedSlaves.remove(uri); } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ClusterPartition other = (ClusterPartition) obj; + if (nodeId == null) { + if (other.nodeId != null) + return false; + } else if (!nodeId.equals(other.nodeId)) + return false; + return true; + } @Override public String toString() { diff --git a/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 8c3b42f5f..e8914035f 100644 --- a/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -25,6 +25,7 @@ import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import io.netty.util.concurrent.Future; @@ -43,9 +44,11 @@ public interface CommandAsyncExecutor { V get(Future future); + Future writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); + Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params); - Future readAsync(InetSocketAddress client, int slot, Codec codec, RedisCommand command, Object ... params); + Future readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); Future readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); @@ -59,10 +62,14 @@ public interface CommandAsyncExecutor { Future evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + Future evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + Future evalReadAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); Future evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + Future evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + Future evalWriteAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); Future readAsync(String key, Codec codec, RedisCommand command, Object ... params); @@ -77,6 +84,8 @@ public interface CommandAsyncExecutor { Future readAsync(String key, RedisCommand command, Object ... params); + Future readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); + Future readAsync(Integer slot, Codec codec, RedisCommand command, Object ... params); Future readRandomAsync(RedisCommand command, Object ... params); diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index e75a6d6c2..eae849358 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -43,8 +43,8 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; import org.slf4j.Logger; @@ -116,9 +116,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public Future readAsync(InetSocketAddress client, int slot, Codec codec, RedisCommand command, Object ... params) { + public Future readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); + async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); return mainPromise; } @@ -133,9 +133,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public Future> readAllAsync(RedisCommand command, Object ... params) { final Promise> mainPromise = connectionManager.newPromise(); + final Set nodes = connectionManager.getEntrySet(); Promise promise = new DefaultPromise() { List results = new ArrayList(); - AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size()); + AtomicInteger counter = new AtomicInteger(nodes.size()); @Override public Promise setSuccess(R result) { if (result instanceof Collection) { @@ -163,8 +164,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { }; - for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0); + for (MasterSlaveEntry entry : nodes) { + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; } @@ -172,25 +173,25 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public Future readRandomAsync(RedisCommand command, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); - final List slots = new ArrayList(connectionManager.getEntries().keySet()); - Collections.shuffle(slots); + final List nodes = new ArrayList(connectionManager.getEntrySet()); + Collections.shuffle(nodes); - retryReadRandomAsync(command, mainPromise, slots, params); + retryReadRandomAsync(command, mainPromise, nodes, params); return mainPromise; } private void retryReadRandomAsync(final RedisCommand command, final Promise mainPromise, - final List slots, final Object... params) { + final List nodes, final Object... params) { final Promise attemptPromise = connectionManager.newPromise(); attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { if (future.getNow() == null) { - if (slots.isEmpty()) { + if (nodes.isEmpty()) { mainPromise.setSuccess(null); } else { - retryReadRandomAsync(command, mainPromise, slots, params); + retryReadRandomAsync(command, mainPromise, nodes, params); } } else { mainPromise.setSuccess(future.getNow()); @@ -201,8 +202,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }); - ClusterSlotRange slot = slots.remove(0); - async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, attemptPromise, 0); + MasterSlaveEntry entry = nodes.remove(0); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0); } @Override @@ -222,9 +223,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { private Future allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); - final Set slots = connectionManager.getEntries().keySet(); + final Set nodes = connectionManager.getEntrySet(); Promise promise = new DefaultPromise() { - AtomicInteger counter = new AtomicInteger(slots.size()); + AtomicInteger counter = new AtomicInteger(nodes.size()); @Override public Promise setSuccess(T result) { if (callback != null) { @@ -246,8 +247,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { return this; } }; - for (ClusterSlotRange slot : slots) { - async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0); + for (MasterSlaveEntry entry : nodes) { + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); } return mainPromise; } @@ -260,10 +261,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { private NodeSource getNodeSource(String key) { int slot = connectionManager.calcSlot(key); - if (slot != 0) { - return new NodeSource(slot); - } - return NodeSource.ZERO; + MasterSlaveEntry entry = connectionManager.getEntry(slot); + return new NodeSource(entry); } @Override @@ -274,12 +273,26 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } + public Future readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { + Promise mainPromise = connectionManager.newPromise(); + async(true, new NodeSource(entry), codec, command, params, mainPromise, 0); + return mainPromise; + } + public Future readAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); async(true, new NodeSource(slot), codec, command, params, mainPromise, 0); return mainPromise; } + @Override + public Future writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { + Promise mainPromise = connectionManager.newPromise(); + async(false, new NodeSource(entry), codec, command, params, mainPromise, 0); + return mainPromise; + } + + @Override public Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); @@ -298,6 +311,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { return evalAsync(source, true, codec, evalCommandType, script, keys, params); } + @Override + public Future evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, params); + } + @Override public Future evalReadAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params); @@ -315,6 +333,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { return evalAsync(source, false, codec, evalCommandType, script, keys, params); } + public Future evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params); + } + public Future evalWriteAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params); } @@ -327,8 +349,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { public Future evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object ... params) { final Promise mainPromise = connectionManager.newPromise(); + final Set entries = connectionManager.getEntrySet(); Promise promise = new DefaultPromise() { - AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size()); + AtomicInteger counter = new AtomicInteger(entries.size()); @Override public Promise setSuccess(T result) { callback.onSlotResult(result); @@ -351,8 +374,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, args.toArray(), promise, 0); + for (MasterSlaveEntry entry : entries) { + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0); } return mainPromise; } diff --git a/src/main/java/org/redisson/command/CommandBatchService.java b/src/main/java/org/redisson/command/CommandBatchService.java index a672944fe..835cfb2db 100644 --- a/src/main/java/org/redisson/command/CommandBatchService.java +++ b/src/main/java/org/redisson/command/CommandBatchService.java @@ -37,6 +37,7 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; @@ -79,7 +80,7 @@ public class CommandBatchService extends CommandReactiveService { private final AtomicInteger index = new AtomicInteger(); - private ConcurrentMap commands = PlatformDependent.newConcurrentHashMap(); + private ConcurrentMap commands = PlatformDependent.newConcurrentHashMap(); private volatile boolean executed; @@ -93,10 +94,10 @@ public class CommandBatchService extends CommandReactiveService { if (executed) { throw new IllegalStateException("Batch already has been executed!"); } - Entry entry = commands.get(nodeSource.getSlot()); + Entry entry = commands.get(nodeSource.getEntry()); if (entry == null) { entry = new Entry(); - Entry oldEntry = commands.putIfAbsent(nodeSource.getSlot(), entry); + Entry oldEntry = commands.putIfAbsent(nodeSource.getEntry(), entry); if (oldEntry != null) { entry = oldEntry; } @@ -133,7 +134,7 @@ public class CommandBatchService extends CommandReactiveService { }); AtomicInteger slots = new AtomicInteger(commands.size()); - for (java.util.Map.Entry e : commands.entrySet()) { + for (java.util.Map.Entry e : commands.entrySet()) { execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0); } return voidPromise; @@ -175,7 +176,7 @@ public class CommandBatchService extends CommandReactiveService { }); AtomicInteger slots = new AtomicInteger(commands.size()); - for (java.util.Map.Entry e : commands.entrySet()) { + for (java.util.Map.Entry e : commands.entrySet()) { execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0); } return promise; diff --git a/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/src/main/java/org/redisson/command/CommandReactiveExecutor.java index 772ebac88..05e205a4e 100644 --- a/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -24,6 +24,7 @@ import org.redisson.SlotCallback; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import io.netty.util.concurrent.Future; @@ -47,7 +48,7 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor { Publisher readRandomReactive(RedisCommand command, Object ... params); - Publisher writeReactive(Integer slot, Codec codec, RedisCommand command, Object ... params); + Publisher writeReactive(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); Publisher writeAllReactive(RedisCommand command, Object ... params); diff --git a/src/main/java/org/redisson/command/CommandReactiveService.java b/src/main/java/org/redisson/command/CommandReactiveService.java index e7d630c5e..236a4da64 100644 --- a/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/src/main/java/org/redisson/command/CommandReactiveService.java @@ -24,6 +24,7 @@ import org.redisson.SlotCallback; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.reactive.NettyFuturePublisher; import io.netty.util.concurrent.Future; @@ -79,8 +80,8 @@ public class CommandReactiveService extends CommandAsyncService implements Comma } @Override - public Publisher writeReactive(Integer slot, Codec codec, RedisCommand command, Object ... params) { - Future f = writeAsync(slot, codec, command, params); + public Publisher writeReactive(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { + Future f = writeAsync(entry, codec, command, params); return new NettyFuturePublisher(f); } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 4750e4fb6..b3dd91900 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -17,7 +17,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.Collection; -import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.redisson.MasterSlaveServersConfig; @@ -26,7 +26,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; -import org.redisson.cluster.ClusterSlotRange; import org.redisson.core.NodeType; import org.redisson.misc.InfinitySemaphoreLatch; @@ -71,8 +70,10 @@ public interface ConnectionManager { Codec getCodec(); - Map getEntries(); - + Set getEntrySet(); + + MasterSlaveEntry getEntry(int slot); + Promise newPromise(); void releaseRead(NodeSource source, RedisConnection connection); diff --git a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java index 91eafb6c6..4844a97df 100644 --- a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java +++ b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java @@ -141,7 +141,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager { log.debug("Current master {} unchanged", master); } else if (Role.master.equals(role) && !master.equals(addr) && currentMaster.compareAndSet(master, addr)) { log.info("Master has changed from {} to {}", master, addr); - changeMaster(singleSlotRange, addr.getHost(), addr.getPort()); + changeMaster(singleSlotRange.getStartSlot(), addr.getHost(), addr.getPort()); break; } } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 3f0912e22..c3b55ee69 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -102,7 +102,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public static final int MAX_SLOT = 16384; - protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT); + protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT-1); private final Logger log = LoggerFactory.getLogger(getClass()); @@ -122,7 +122,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected boolean isClusterMode; - private final Map entries = PlatformDependent.newConcurrentHashMap(); + private final Map entries = PlatformDependent.newConcurrentHashMap(); private final Promise shutdownPromise; @@ -182,11 +182,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return codec; } - @Override - public Map getEntries() { - return entries; + public Set getEntrySet() { + return new HashSet(entries.values()); } - + protected void init(MasterSlaveServersConfig config) { this.config = config; @@ -228,7 +227,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } else { entry = createMasterSlaveEntry(config, slots); } - addEntry(singleSlotRange, entry); + + for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { + addEntry(slot, entry); + } } protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, @@ -288,7 +290,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public int calcSlot(String key) { - return 0; + return singleSlotRange.getStartSlot(); } @Override @@ -556,7 +558,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public MasterSlaveEntry getEntry(InetSocketAddress addr) { // TODO optimize - for (Entry entry : entries.entrySet()) { + for (Entry entry : entries.entrySet()) { if (entry.getValue().getClient().getAddr().equals(addr)) { return entry.getValue(); } @@ -564,65 +566,62 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - protected MasterSlaveEntry getEntry(ClusterSlotRange slotRange) { - return entries.get(slotRange); + public MasterSlaveEntry getEntry(int slot) { + return entries.get(slot); } - - protected MasterSlaveEntry getEntry(int slot) { - // TODO optimize - for (Entry entry : entries.entrySet()) { - if (entry.getKey().isOwn(slot)) { - return entry.getValue(); - } - } - return null; - } - + protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { - getEntry(slotRange).slaveDown(host, port, freezeReason); + getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason); } - protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { - getEntry(slotRange).changeMaster(host, port); + protected void changeMaster(int slot, String host, int port) { + getEntry(slot).changeMaster(host, port); } - protected void addEntry(ClusterSlotRange slotRange, MasterSlaveEntry entry) { - entries.put(slotRange, entry); + protected void addEntry(Integer slot, MasterSlaveEntry entry) { + entries.put(slot, entry); } - protected MasterSlaveEntry removeMaster(ClusterSlotRange slotRange) { - return entries.remove(slotRange); + protected MasterSlaveEntry removeMaster(Integer slot) { + return entries.remove(slot); } @Override public Future connectionWriteOp(NodeSource source, RedisCommand command) { - MasterSlaveEntry e = getEntry(source, command); - return e.connectionWriteOp(); + MasterSlaveEntry entry = source.getEntry(); + if (entry == null) { + entry = getEntry(source); + } + return entry.connectionWriteOp(); } private MasterSlaveEntry getEntry(NodeSource source) { - MasterSlaveEntry e = getEntry(source.getSlot()); - if (e == null) { - throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot()); + // workaround for slots in migration state + if (source.getRedirect() != null) { + MasterSlaveEntry e = getEntry(source.getAddr()); + if (e == null) { + throw new RedisNodeNotFoundException("No node for slot: " + source.getAddr()); + } + return e; } - return e; - } - - private MasterSlaveEntry getEntry(NodeSource source, RedisCommand command) { + MasterSlaveEntry e = getEntry(source.getSlot()); if (e == null) { - throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command); + throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot()); } return e; } @Override public Future connectionReadOp(NodeSource source, RedisCommand command) { - MasterSlaveEntry e = getEntry(source, command); + MasterSlaveEntry entry = source.getEntry(); + if (entry == null && source.getSlot() != null) { + entry = getEntry(source.getSlot()); + } if (source.getAddr() != null) { - return e.connectionReadOp(source.getAddr()); + return entry.connectionReadOp(source.getAddr()); } - return e.connectionReadOp(); + return entry.connectionReadOp(); } Future nextPubSubConnection(int slot) { @@ -635,12 +634,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void releaseWrite(NodeSource source, RedisConnection connection) { - getEntry(source).releaseWrite(connection); + MasterSlaveEntry entry = source.getEntry(); + if (entry == null) { + entry = getEntry(source); + } + entry.releaseWrite(connection); } @Override public void releaseRead(NodeSource source, RedisConnection connection) { - getEntry(source).releaseRead(connection); + MasterSlaveEntry entry = source.getEntry(); + if (entry == null) { + entry = getEntry(source); + } + entry.releaseRead(connection); } @Override diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index a17389dec..fdd7e9378 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -18,6 +18,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -61,12 +62,16 @@ public class MasterSlaveEntry { final ConnectionManager connectionManager; final MasterConnectionPool writeConnectionHolder; - final Set slotRanges; + final Set slots = new HashSet(); final AtomicBoolean active = new AtomicBoolean(true); public MasterSlaveEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { - this.slotRanges = slotRanges; + for (ClusterSlotRange clusterSlotRange : slotRanges) { + for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { + slots.add(i); + } + } this.connectionManager = connectionManager; this.config = config; @@ -405,16 +410,16 @@ public class MasterSlaveEntry { slaveBalancer.shutdown(); } - public void addSlotRange(ClusterSlotRange range) { - slotRanges.add(range); + public void addSlotRange(Integer range) { + slots.add(range); } - public void removeSlotRange(ClusterSlotRange range) { - slotRanges.remove(range); + public void removeSlotRange(Integer range) { + slots.remove(range); } - public Set getSlotRanges() { - return slotRanges; + public Set getSlotRanges() { + return slots; } } diff --git a/src/main/java/org/redisson/connection/NodeSource.java b/src/main/java/org/redisson/connection/NodeSource.java index 63cfbaeb3..3fa173650 100644 --- a/src/main/java/org/redisson/connection/NodeSource.java +++ b/src/main/java/org/redisson/connection/NodeSource.java @@ -26,7 +26,18 @@ public class NodeSource { private final Integer slot; private final InetSocketAddress addr; private final Redirect redirect; + private MasterSlaveEntry entry; + public NodeSource(MasterSlaveEntry entry) { + this(null, null, null); + this.entry = entry; + } + + public NodeSource(MasterSlaveEntry entry, InetSocketAddress addr) { + this(null, addr, null); + this.entry = entry; + } + public NodeSource(Integer slot) { this(slot, null, null); } @@ -41,6 +52,10 @@ public class NodeSource { this.redirect = redirect; } + public MasterSlaveEntry getEntry() { + return entry; + } + public Redirect getRedirect() { return redirect; } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 5d93776bf..2dfbced92 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -218,7 +218,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { // to avoid addition twice if (slaves.putIfAbsent(slaveAddr, true) == null && config.getReadMode() == ReadMode.SLAVE) { - Future future = getEntry(singleSlotRange).addSlave(ip, Integer.valueOf(port)); + Future future = getEntry(singleSlotRange.getStartSlot()).addSlave(ip, Integer.valueOf(port)); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -228,7 +228,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} added", slaveAddr); } @@ -265,7 +265,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - MasterSlaveEntry entry = getEntry(singleSlotRange); + MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); if (entry.getFreezeReason() != FreezeReason.MANAGER) { entry.freeze(); String addr = ip + ":" + port; @@ -279,7 +279,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private void slaveDown(String ip, String port) { if (config.getReadMode() == ReadMode.SLAVE) { - slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER); + getEntry(singleSlotRange.getStartSlot()).slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER); } log.warn("slave: {}:{} has down", ip, port); @@ -299,7 +299,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = parts[3]; String masterAddr = ip + ":" + port; - MasterSlaveEntry entry = getEntry(singleSlotRange); + MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); if (entry.isFreezed() && entry.getClient().getAddr().equals(new InetSocketAddress(ip, Integer.valueOf(port)))) { entry.unfreeze(); @@ -318,7 +318,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} has up", slaveAddr); } @@ -336,8 +336,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String newMaster = ip + ":" + port; if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { - changeMaster(singleSlotRange, ip, Integer.valueOf(port)); - log.info("master has changed from {} to {}", current, newMaster); + changeMaster(singleSlotRange.getStartSlot(), ip, Integer.valueOf(port)); + log.info("master {} changed to {}", current, newMaster); } } } else { diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index fc24163a4..d7a97afd2 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -88,7 +88,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { if (!now.getHostAddress().equals(master.getHostAddress())) { log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress()); if (currentMaster.compareAndSet(master, now)) { - changeMaster(singleSlotRange, cfg.getAddress().getHost(), cfg.getAddress().getPort()); + changeMaster(singleSlotRange.getStartSlot(), cfg.getAddress().getHost(), cfg.getAddress().getPort()); log.info("Master has been changed"); } } diff --git a/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index 74c9b9764..fe2cc94c8 100644 --- a/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -18,6 +18,7 @@ package org.redisson.reactive; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import org.reactivestreams.Publisher; @@ -30,6 +31,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.cluster.ClusterSlotRange; import org.redisson.command.CommandReactiveService; +import org.redisson.connection.MasterSlaveEntry; import reactor.rx.Stream; import reactor.rx.Streams; @@ -55,8 +57,8 @@ public class RedissonKeysReactive implements RKeysReactive { @Override public Publisher getKeysByPattern(final String pattern) { List> publishers = new ArrayList>(); - for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) { - publishers.add(createKeysIterator(slot.getStartSlot(), pattern)); + for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { + publishers.add(createKeysIterator(entry, pattern)); } return Streams.merge(publishers); } @@ -66,14 +68,14 @@ public class RedissonKeysReactive implements RKeysReactive { return getKeysByPattern(null); } - private Publisher> scanIterator(int slot, long startPos, String pattern) { + private Publisher> scanIterator(MasterSlaveEntry entry, long startPos, String pattern) { if (pattern == null) { - return commandExecutor.writeReactive(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos); + return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos); } - return commandExecutor.writeReactive(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern); + return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern); } - private Publisher createKeysIterator(final int slot, final String pattern) { + private Publisher createKeysIterator(final MasterSlaveEntry entry, final String pattern) { return new Stream() { @Override @@ -94,7 +96,7 @@ public class RedissonKeysReactive implements RKeysReactive { protected void nextValues() { final ReactiveSubscription m = this; - scanIterator(slot, nextIterPos, pattern).subscribe(new Subscriber>() { + scanIterator(entry, nextIterPos, pattern).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) {