diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index d6f043f0d..96081f34c 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -195,9 +195,9 @@ public class CommandBatchExecutorService extends CommandExecutorService { Future connectionFuture; if (entry.isReadOnlyMode()) { - connectionFuture = connectionManager.connectionReadOp(slot); + connectionFuture = connectionManager.connectionReadOp(slot, null); } else { - connectionFuture = connectionManager.connectionWriteOp(slot); + connectionFuture = connectionManager.connectionWriteOp(slot, null); } connectionFuture.addListener(new FutureListener() { @@ -214,7 +214,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { RedisConnection connection = connFuture.getNow(); - ArrayList> list = new ArrayList>(entry.getCommands().size()); + List> list = new ArrayList>(entry.getCommands().size()); for (CommandEntry c : entry.getCommands()) { list.add(c.getCommand()); } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 040b8bf4c..ec0c63dd6 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -259,9 +259,9 @@ public class CommandExecutorService implements CommandExecutor { try { Future connectionFuture; if (readOnlyMode) { - connectionFuture = connectionManager.connectionReadOp(slot); + connectionFuture = connectionManager.connectionReadOp(slot, null); } else { - connectionFuture = connectionManager.connectionWriteOp(slot); + connectionFuture = connectionManager.connectionWriteOp(slot, null); } connectionFuture.syncUninterruptibly(); @@ -437,12 +437,12 @@ public class CommandExecutorService implements CommandExecutor { Future connectionFuture; if (readOnlyMode) { if (client != null) { - connectionFuture = connectionManager.connectionReadOp(slot, client); + connectionFuture = connectionManager.connectionReadOp(slot, command, client); } else { - connectionFuture = connectionManager.connectionReadOp(slot); + connectionFuture = connectionManager.connectionReadOp(slot, command); } } else { - connectionFuture = connectionManager.connectionWriteOp(slot); + connectionFuture = connectionManager.connectionWriteOp(slot, command); } connectionFuture.addListener(new FutureListener() { diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 7223dc8d3..d465e1f7d 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -221,7 +221,7 @@ public class RedisCommand { @Override public String toString() { - return "RedisCommand [name=" + name + ", subName=" + subName + "]"; + return "RedisCommand [" + name + " " + subName != null ? subName : "" + "]"; } } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 458a1071c..76167546f 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -15,7 +15,6 @@ */ package org.redisson.cluster; -import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -23,7 +22,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -35,6 +33,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterNodeInfo.Flag; +import org.redisson.connection.CRC16; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.SingleEntry; @@ -123,7 +122,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); for (ClusterSlotRange slotRange : partition.getSlotRanges()) { - addMaster(slotRange, entry); + addEntry(slotRange, entry); lastPartitions.put(slotRange, partition); } } @@ -205,40 +204,38 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } - private void checkSlotsChange(ClusterServersConfig cfg, Collection partitions) { - Collection partitionsSlots = slots(partitions); + private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions) { + checkSlotsMigration(newPartitions); + + Collection newPartitionsSlots = slots(newPartitions); Set removedSlots = new HashSet(lastPartitions.keySet()); - removedSlots.removeAll(partitionsSlots); + removedSlots.removeAll(newPartitionsSlots); lastPartitions.keySet().removeAll(removedSlots); if (!removedSlots.isEmpty()) { log.info("{} slot ranges found to remove", removedSlots.size()); } - Map removeAddrs = new HashMap(); for (ClusterSlotRange slot : removedSlots) { MasterSlaveEntry entry = removeMaster(slot); entry.removeSlotRange(slot); if (entry.getSlotRanges().isEmpty()) { entry.shutdownMasterAsync(); - removeAddrs.put(slot, entry); + log.info("{} master and slaves for it removed", entry.getClient().getAddr()); } } - for (Entry entry : removeAddrs.entrySet()) { - InetSocketAddress url = entry.getValue().getClient().getAddr(); - slaveDown(entry.getKey(), url.getHostName(), url.getPort()); - } - Set addedSlots = new HashSet(partitionsSlots); + + Set addedSlots = new HashSet(newPartitionsSlots); addedSlots.removeAll(lastPartitions.keySet()); if (!addedSlots.isEmpty()) { log.info("{} slots found to add", addedSlots.size()); } for (ClusterSlotRange slot : addedSlots) { - ClusterPartition partition = find(partitions, slot); + ClusterPartition partition = find(newPartitions, slot); boolean masterFound = false; for (MasterSlaveEntry entry : getEntries().values()) { if (entry.getClient().getAddr().equals(partition.getMasterAddr())) { - addMaster(slot, entry); + addEntry(slot, entry); lastPartitions.put(slot, partition); masterFound = true; break; @@ -248,7 +245,54 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { addMasterEntry(partition, cfg); } } + } + private void checkSlotsMigration(Collection newPartitions) { + List currentPartitions = new ArrayList(lastPartitions.values()); + for (ClusterPartition currentPartition : currentPartitions) { + for (ClusterPartition newPartition : newPartitions) { + if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) { + continue; + } + + Set addedSlots = new HashSet(newPartition.getSlotRanges()); + addedSlots.removeAll(currentPartition.getSlotRanges()); + MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next()); + for (ClusterSlotRange slot : addedSlots) { + entry.addSlotRange(slot); + addEntry(slot, entry); + log.info("slot {} added for {}", slot, entry.getClient().getAddr()); + lastPartitions.put(slot, currentPartition); + } + + Set removedSlots = new HashSet(currentPartition.getSlotRanges()); + removedSlots.removeAll(newPartition.getSlotRanges()); + lastPartitions.keySet().removeAll(removedSlots); + + for (ClusterSlotRange slot : removedSlots) { + log.info("slot {} removed for {}", slot, entry.getClient().getAddr()); + entry.removeSlotRange(slot); + removeMaster(slot); + } + } + } + } + + @Override + public int calcSlot(String key) { + if (key == null) { + return 0; + } + + int start = key.indexOf('{'); + if (start != -1) { + int end = key.indexOf('}'); + key = key.substring(start+1, end); + } + + int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; + log.debug("slot {} for {}", result, key); + return result; } private Collection parsePartitions(String nodesValue) { @@ -268,7 +312,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ClusterPartition partition = partitions.get(id); if (partition == null) { - partition = new ClusterPartition(); + partition = new ClusterPartition(id); partitions.put(id, partition); } diff --git a/src/main/java/org/redisson/cluster/ClusterPartition.java b/src/main/java/org/redisson/cluster/ClusterPartition.java index 7e711f0be..94faab052 100644 --- a/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -26,11 +26,21 @@ import org.redisson.misc.URIBuilder; public class ClusterPartition { + private final String nodeId; private boolean masterFail; private URI masterAddress; private List slaveAddresses = new ArrayList(); private final Set slotRanges = new HashSet(); + public ClusterPartition(String nodeId) { + super(); + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + public void setMasterFail(boolean masterFail) { this.masterFail = masterFail; } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index cd6417a38..3d67e1b8b 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -24,6 +24,7 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; import org.redisson.misc.InfinitySemaphoreLatch; @@ -63,11 +64,11 @@ public interface ConnectionManager { void releaseWrite(int slot, RedisConnection connection); - Future connectionReadOp(int slot); + Future connectionReadOp(int slot, RedisCommand command); - Future connectionReadOp(int slot, RedisClient client); + Future connectionReadOp(int slot, RedisCommand command, RedisClient client); - Future connectionWriteOp(int slot); + Future connectionWriteOp(int slot, RedisCommand command); FutureListener createReleaseReadListener(int slot, RedisConnection conn, Timeout timeout); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 51364005d..9aa3e8fce 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -33,6 +33,7 @@ import org.redisson.client.RedisEmptySlotException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.cluster.ClusterSlotRange; import org.redisson.misc.InfinitySemaphoreLatch; @@ -60,7 +61,7 @@ import io.netty.util.internal.PlatformDependent; */ public class MasterSlaveConnectionManager implements ConnectionManager { - static final int MAX_SLOT = 16384; + protected static final int MAX_SLOT = 16384; protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT); @@ -135,7 +136,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { slots.add(singleSlotRange); MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - addMaster(singleSlotRange, entry); + addEntry(singleSlotRange, entry); } protected void init(Config cfg) { @@ -206,19 +207,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public int calcSlot(String key) { - if (entries.size() == 1 || key == null) { - return 0; - } - - int start = key.indexOf('{'); - if (start != -1) { - int end = key.indexOf('}'); - key = key.substring(start+1, end); - } - - int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; - log.debug("slot {} for {}", result, key); - return result; + return 0; } @Override @@ -547,7 +536,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { getEntry(slotRange).changeMaster(host, port); } - protected void addMaster(ClusterSlotRange slotRange, MasterSlaveEntry entry) { + protected void addEntry(ClusterSlotRange slotRange, MasterSlaveEntry entry) { entries.put(slotRange, entry); } @@ -556,28 +545,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Future connectionWriteOp(int slot) { + public Future connectionWriteOp(int slot, RedisCommand command) { MasterSlaveEntry e = getEntry(slot); if (e == null) { - throw new RedisEmptySlotException("No node for slot: " + slot, slot); + throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); } return e.connectionWriteOp(); } @Override - public Future connectionReadOp(int slot) { + public Future connectionReadOp(int slot, RedisCommand command) { MasterSlaveEntry e = getEntry(slot); if (e == null) { - throw new RedisEmptySlotException("No node for slot: " + slot, slot); + throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); } return e.connectionReadOp(); } @Override - public Future connectionReadOp(int slot, RedisClient client) { + public Future connectionReadOp(int slot, RedisCommand command, RedisClient client) { MasterSlaveEntry e = getEntry(slot); if (e == null) { - throw new RedisEmptySlotException("No node for slot: " + slot, slot); + throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot); } return e.connectionReadOp(client); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index c79311dcf..acc495ab7 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -86,6 +86,7 @@ public class MasterSlaveEntry { public Collection slaveDown(String host, int port) { Collection conns = slaveBalancer.freeze(host, port); + // add master as slave if no more slaves available if (slaveBalancer.getAvailableClients() == 0) { InetSocketAddress addr = masterEntry.getClient().getAddr(); slaveUp(addr.getHostName(), addr.getPort()); @@ -177,6 +178,10 @@ public class MasterSlaveEntry { slaveBalancer.shutdown(); } + public void addSlotRange(ClusterSlotRange range) { + slotRanges.add(range); + } + public void removeSlotRange(ClusterSlotRange range) { slotRanges.remove(range); } diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 3c8e49407..26f995a3b 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -76,7 +76,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { slots.add(singleSlotRange); SingleEntry entry = new SingleEntry(slots, this, config); entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - addMaster(singleSlotRange, entry); + addEntry(singleSlotRange, entry); } private void monitorDnsChange(final SingleServerConfig cfg) {