Cluster slot changes discovery fixed

pull/527/head^2
Nikita 9 years ago
parent b71e3dde05
commit cb8f53f9c6

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -31,9 +32,9 @@ import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.core.RKeys;
import org.redisson.core.RType;
import org.redisson.misc.CompositeIterable;
@ -78,11 +79,11 @@ public class RedissonKeys implements RKeys {
public Iterable<String> getKeysByPattern(final String pattern, final int count) {
List<Iterable<String>> iterables = new ArrayList<Iterable<String>>();
for (final ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
for (final MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return createKeysIterator(slot.getStartSlot(), pattern, count);
return createKeysIterator(entry, pattern, count);
}
};
iterables.add(iterable);
@ -96,21 +97,21 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null);
}
private ListScanResult<String> scanIterator(InetSocketAddress client, int slot, long startPos, String pattern, int count) {
private ListScanResult<String> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
Future<ListScanResult<String>> f = commandExecutor.readAsync(client, slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
Future<ListScanResult<String>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
}
Future<ListScanResult<String>> f = commandExecutor.readAsync(client, slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
Future<ListScanResult<String>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
return commandExecutor.get(f);
}
private Iterator<String> createKeysIterator(final int slot, final String pattern, final int count) {
private Iterator<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
return new RedissonBaseIterator<String>() {
@Override
ListScanResult<String> iterator(InetSocketAddress client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, slot, nextIterPos, pattern, count);
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
}
@Override
@ -160,7 +161,8 @@ public class RedissonKeys implements RKeys {
final Promise<Long> result = commandExecutor.getConnectionManager().newPromise();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(commandExecutor.getConnectionManager().getEntries().size());
Set<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet();
final AtomicLong executed = new AtomicLong(entries.size());
final FutureListener<Long> listener = new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
@ -174,8 +176,8 @@ public class RedissonKeys implements RKeys {
}
};
for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
Future<Collection<String>> findFuture = commandExecutor.readAsync(slot.getStartSlot(), null, RedisCommands.KEYS, pattern);
for (MasterSlaveEntry entry : entries) {
Future<Collection<String>> findFuture = commandExecutor.readAsync(entry, null, RedisCommands.KEYS, pattern);
findFuture.addListener(new FutureListener<Collection<String>>() {
@Override
public void operationComplete(Future<Collection<String>> future) throws Exception {
@ -211,18 +213,16 @@ public class RedissonKeys implements RKeys {
return commandExecutor.writeAsync(null, RedisCommands.DEL, keys);
}
Map<ClusterSlotRange, List<String>> range2key = new HashMap<ClusterSlotRange, List<String>>();
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<MasterSlaveEntry, List<String>>();
for (String key : keys) {
int slot = commandExecutor.getConnectionManager().calcSlot(key);
for (ClusterSlotRange range : commandExecutor.getConnectionManager().getEntries().keySet()) {
if (range.isOwn(slot)) {
List<String> list = range2key.get(range);
if (list == null) {
list = new ArrayList<String>();
range2key.put(range, list);
}
list.add(key);
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
List<String> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<String>();
range2key.put(entry, list);
}
list.add(key);
}
}
@ -246,11 +246,11 @@ public class RedissonKeys implements RKeys {
}
};
for (Entry<ClusterSlotRange, List<String>> entry : range2key.entrySet()) {
for (Entry<MasterSlaveEntry, List<String>> entry : range2key.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (String key : entry.getValue()) {
executorService.writeAsync(entry.getKey().getStartSlot(), null, RedisCommands.DEL, key);
executorService.writeAsync(entry.getKey(), null, RedisCommands.DEL, key);
}
Future<List<?>> future = executorService.executeAsync();

@ -267,9 +267,11 @@ public interface RedisCommands {
RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE");
RedisStrictCommand<Void> CLUSTER_FORGET = new RedisStrictCommand<Void>("CLUSTER", "FORGET");
RedisStrictCommand<Void> CLUSTER_RESET = new RedisStrictCommand<Void>("CLUSTER", "RESET");
RedisStrictCommand<List<String>> CLUSTER_GETKEYSINSLOT = new RedisStrictCommand<List<String>>("CLUSTER", "GETKEYSINSLOT", new StringListReplayDecoder());
RedisStrictCommand<Void> CLUSTER_SETSLOT = new RedisStrictCommand<Void>("CLUSTER", "SETSLOT");
RedisStrictCommand<Void> CLUSTER_MEET = new RedisStrictCommand<Void>("CLUSTER", "MEET");
RedisStrictCommand<Map<String, String>> INFO_KEYSPACE = new RedisStrictCommand<Map<String, String>>("INFO", "KEYSPACE", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_CLUSTER = new RedisStrictCommand<Map<String, String>>("INFO", "CLUSTER", new StringMapDataDecoder());
RedisStrictCommand<String> INFO_REPLICATION = new RedisStrictCommand<String>("INFO", "replication", new StringDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_PERSISTENCE = new RedisStrictCommand<Map<String, String>>("INFO", "persistence", new StringMapDataDecoder());

@ -23,8 +23,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.ClusterServersConfig;
@ -50,14 +53,15 @@ import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<URI, RedisConnection> nodeConnections = new HashMap<URI, RedisConnection>();
private final Map<URI, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final Map<ClusterSlotRange, ClusterPartition> lastPartitions = new HashMap<ClusterSlotRange, ClusterPartition>();
private final Map<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();
private ScheduledFuture<?> monitorFuture;
@ -96,13 +100,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Future<Collection<Future<Void>>> masterFuture : futures) {
masterFuture.awaitUninterruptibly();
if (!masterFuture.isSuccess()) {
log.error("Can't connect to master node.", masterFuture.cause());
continue;
}
for (Future<Void> future : masterFuture.getNow()) {
future.awaitUninterruptibly();
if (!future.isSuccess()) {
log.error("Can't add nodes.", masterFuture.cause());
continue;
}
}
@ -119,6 +121,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Can't connect to servers!", lastException);
}
if (lastPartitions.size() != MAX_SLOT) {
stopThreads();
throw new RedisConnectionException("Not all slots are covered! Only " + lastPartitions.size() + " slots are avaliable", lastException);
}
scheduleClusterChangeCheck(cfg, null);
}
@ -195,6 +202,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
result.setFailure(future.cause());
return;
}
@ -216,6 +224,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisException e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + " for slot ranges: " +
partition.getSlotRanges() + ". Reason - cluster_state:fail");
log.error("cluster_state:fail for " + connection.getRedisClient().getAddr());
result.setFailure(e);
return;
}
@ -243,21 +252,25 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
Future<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
final Promise<Void> initFuture = newPromise();
futures.add(initFuture);
f.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
initFuture.setFailure(future.cause());
return;
}
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, e);
lastPartitions.put(slotRange, partition);
for (Integer slot : partition.getSlots()) {
addEntry(slot, e);
lastPartitions.put(slot, partition);
}
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
initFuture.setSuccess(null);
}
});
futures.add(f);
result.setSuccess(futures);
}
});
@ -277,7 +290,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (nodesIterator == null) {
List<URI> nodes = new ArrayList<URI>();
List<URI> slaves = new ArrayList<URI>();
for (ClusterPartition partition : lastPartitions.values()) {
if (lastPartitions.isEmpty()) {
System.out.println("lastPartitions.isEmpty()");
}
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
@ -334,32 +351,38 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
List<ClusterNodeInfo> nodes = future.getNow();
final StringBuilder nodesValue = new StringBuilder();
if (log.isDebugEnabled()) {
StringBuilder nodesValue = new StringBuilder();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}
Collection<ClusterPartition> newPartitions = parsePartitions(nodes);
checkMasterNodesChange(newPartitions);
final Collection<ClusterPartition> newPartitions = parsePartitions(nodes);
Future<Void> masterFuture = checkMasterNodesChange(cfg, newPartitions);
checkSlaveNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions);
scheduleClusterChangeCheck(cfg, null);
masterFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
checkSlotsMigration(newPartitions, nodesValue.toString());
checkSlotsChange(cfg, newPartitions, nodesValue.toString());
scheduleClusterChangeCheck(cfg, null);
}
});
}
});
}
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition newPart : newPartitions) {
for (ClusterPartition currentPart : lastPartitions.values()) {
for (ClusterPartition currentPart : getLastPartitions()) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
// should be invoked first in order to removed stale failedSlaveAddresses
// should be invoked first in order to remove stale failedSlaveAddresses
addRemoveSlaves(entry, currentPart, newPart);
// Does some slaves change failed state to alive?
upDownSlaves(entry, currentPart, newPart);
@ -421,63 +444,109 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private Collection<ClusterSlotRange> slots(Collection<ClusterPartition> partitions) {
List<ClusterSlotRange> result = new ArrayList<ClusterSlotRange>();
private Collection<Integer> slots(Collection<ClusterPartition> partitions) {
Set<Integer> result = new HashSet<Integer>(MAX_SLOT);
for (ClusterPartition clusterPartition : partitions) {
result.addAll(clusterPartition.getSlotRanges());
result.addAll(clusterPartition.getSlots());
}
return result;
}
private ClusterPartition find(Collection<ClusterPartition> partitions, ClusterSlotRange slotRange) {
private ClusterPartition find(Collection<ClusterPartition> partitions, Integer slot) {
for (ClusterPartition clusterPartition : partitions) {
if (clusterPartition.getSlotRanges().contains(slotRange)) {
return clusterPartition;
for (ClusterSlotRange slotRange : clusterPartition.getSlotRanges()) {
if (slotRange.isOwn(slot)) {
return clusterPartition;
}
}
}
return null;
}
private void checkMasterNodesChange(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition newPart : newPartitions) {
for (ClusterPartition currentPart : lastPartitions.values()) {
private Future<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
List<ClusterPartition> newMasters = new ArrayList<ClusterPartition>();
for (final ClusterPartition newPart : newPartitions) {
boolean masterFound = false;
for (ClusterPartition currentPart : getLastPartitions()) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
masterFound = true;
// current master marked as failed
if (newPart.isMasterFail()) {
for (ClusterSlotRange currentSlotRange : currentPart.getSlotRanges()) {
ClusterPartition newMasterPart = find(newPartitions, currentSlotRange);
// does partition has a new master?
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), currentSlotRange);
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress();
changeMaster(currentSlotRange, newUri.getHost(), newUri.getPort());
currentPart.setMasterAddress(newMasterPart.getMasterAddress());
}
if (!newPart.isMasterFail()) {
continue;
}
for (Integer slot : currentPart.getSlots()) {
ClusterPartition newMasterPart = find(newPartitions, slot);
// does partition has a new master?
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress();
changeMaster(slot, newUri.getHost(), newUri.getPort());
currentPart.setMasterAddress(newMasterPart.getMasterAddress());
}
}
break;
}
if (!masterFound && !newPart.getSlotRanges().isEmpty()) {
newMasters.add(newPart);
}
}
if (newMasters.isEmpty()) {
return newSucceededFuture(null);
}
final Promise<Void> result = newPromise();
final AtomicInteger masters = new AtomicInteger(newMasters.size());
final Queue<Future<Void>> futures = new ConcurrentLinkedQueue<Future<Void>>();
for (ClusterPartition newPart : newMasters) {
Future<Collection<Future<Void>>> future = addMasterEntry(newPart, cfg);
future.addListener(new FutureListener<Collection<Future<Void>>>() {
@Override
public void operationComplete(Future<Collection<Future<Void>>> future) throws Exception {
if (future.isSuccess()) {
futures.addAll(future.getNow());
}
if (masters.decrementAndGet() == 0) {
final AtomicInteger nodes = new AtomicInteger(futures.size());
for (Future<Void> nodeFuture : futures) {
nodeFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (nodes.decrementAndGet() == 0) {
result.setSuccess(null);
}
}
});
}
}
}
});
}
return result;
}
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
checkSlotsMigration(newPartitions);
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions, String nodes) {
Collection<Integer> newPartitionsSlots = slots(newPartitions);
if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
return;
}
Collection<ClusterSlotRange> newPartitionsSlots = slots(newPartitions);
Set<ClusterSlotRange> removedSlots = new HashSet<ClusterSlotRange>(lastPartitions.keySet());
Set<Integer> removedSlots = new HashSet<Integer>(lastPartitions.keySet());
removedSlots.removeAll(newPartitionsSlots);
lastPartitions.keySet().removeAll(removedSlots);
if (!removedSlots.isEmpty()) {
log.info("{} slot ranges found to remove", removedSlots);
log.info("{} slots found to remove", removedSlots.size());
}
for (ClusterSlotRange slot : removedSlots) {
for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeMaster(slot);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
@ -487,38 +556,25 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartitionsSlots);
Set<Integer> addedSlots = new HashSet<Integer>(newPartitionsSlots);
addedSlots.removeAll(lastPartitions.keySet());
if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots);
log.info("{} slots found to add", addedSlots.size());
}
for (final ClusterSlotRange slot : addedSlots) {
for (final Integer slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot);
boolean masterFound = false;
for (MasterSlaveEntry entry : getEntries().values()) {
for (MasterSlaveEntry entry : getEntrySet()) {
if (entry.getClient().getAddr().equals(partition.getMasterAddr())) {
addEntry(slot, entry);
lastPartitions.put(slot, partition);
masterFound = true;
break;
}
}
if (!masterFound) {
Future<Collection<Future<Void>>> future = addMasterEntry(partition, cfg);
future.addListener(new FutureListener<Collection<Future<Void>>>() {
@Override
public void operationComplete(Future<Collection<Future<Void>>> future) throws Exception {
if (!future.isSuccess()) {
log.error("New cluster slot range " + slot + " without master node detected", future.cause());
}
}
});
}
}
}
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
List<ClusterPartition> currentPartitions = new ArrayList<ClusterPartition>(lastPartitions.values());
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions, String nodes) {
Set<ClusterPartition> currentPartitions = getLastPartitions();
for (ClusterPartition currentPartition : currentPartitions) {
for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())
@ -527,27 +583,35 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue;
}
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartition.getSlotRanges());
addedSlots.removeAll(currentPartition.getSlotRanges());
MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next());
currentPartition.addSlotRanges(addedSlots);
for (ClusterSlotRange slot : addedSlots) {
Set<Integer> addedSlots = new HashSet<Integer>(newPartition.getSlots());
addedSlots.removeAll(currentPartition.getSlots());
currentPartition.addSlots(addedSlots);
MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr());
for (Integer slot : addedSlots) {
entry.addSlotRange(slot);
addEntry(slot, entry);
log.info("{} slot added for {}", slot, entry.getClient().getAddr());
lastPartitions.put(slot, currentPartition);
}
if (!addedSlots.isEmpty()) {
log.info("{} slots added to {}", addedSlots.size(), entry.getClient().getAddr());
}
Set<ClusterSlotRange> removedSlots = new HashSet<ClusterSlotRange>(currentPartition.getSlotRanges());
removedSlots.removeAll(newPartition.getSlotRanges());
lastPartitions.keySet().removeAll(removedSlots);
currentPartition.removeSlotRanges(removedSlots);
Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots());
removedSlots.removeAll(newPartition.getSlots());
for (Integer removeSlot : removedSlots) {
if (lastPartitions.remove(removeSlot, currentPartition)) {
entry.removeSlotRange(removeSlot);
removeMaster(removeSlot);
}
}
currentPartition.removeSlots(removedSlots);
for (ClusterSlotRange slot : removedSlots) {
log.info("{} slot removed for {}", slot, entry.getClient().getAddr());
entry.removeSlotRange(slot);
removeMaster(slot);
if (!removedSlots.isEmpty()) {
log.info("{} slots removed from {}", removedSlots.size(), entry.getClient().getAddr());
}
break;
}
}
}
@ -616,5 +680,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
connection.getRedisClient().shutdown();
}
}
private HashSet<ClusterPartition> getLastPartitions() {
return new HashSet<ClusterPartition>(lastPartitions.values());
}
}

@ -30,6 +30,8 @@ public class ClusterPartition {
private URI masterAddress;
private final Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> failedSlaves = new HashSet<URI>();
private final Set<Integer> slots = new HashSet<Integer>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
public ClusterPartition(String nodeId) {
@ -48,15 +50,36 @@ public class ClusterPartition {
return masterFail;
}
public void addSlots(Set<Integer> slots) {
this.slots.addAll(slots);
}
public void removeSlots(Set<Integer> slots) {
this.slots.removeAll(slots);
}
public void addSlotRanges(Set<ClusterSlotRange> ranges) {
for (ClusterSlotRange clusterSlotRange : ranges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.add(i);
}
}
slotRanges.addAll(ranges);
}
public void removeSlotRanges(Set<ClusterSlotRange> ranges) {
for (ClusterSlotRange clusterSlotRange : ranges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.remove(i);
}
}
slotRanges.removeAll(ranges);
}
public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges;
}
public Set<Integer> getSlots() {
return slots;
}
public InetSocketAddress getMasterAddr() {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
@ -92,6 +115,31 @@ public class ClusterPartition {
slaveAddresses.remove(uri);
failedSlaves.remove(uri);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ClusterPartition other = (ClusterPartition) obj;
if (nodeId == null) {
if (other.nodeId != null)
return false;
} else if (!nodeId.equals(other.nodeId))
return false;
return true;
}
@Override
public String toString() {

@ -25,6 +25,7 @@ import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.concurrent.Future;
@ -43,9 +44,11 @@ public interface CommandAsyncExecutor {
<V> V get(Future<V> future);
<T, R> Future<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(InetSocketAddress client, int slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
@ -59,10 +62,14 @@ public interface CommandAsyncExecutor {
<T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
@ -77,6 +84,8 @@ public interface CommandAsyncExecutor {
<T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params);

@ -43,8 +43,8 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.slf4j.Logger;
@ -116,9 +116,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> Future<R> readAsync(InetSocketAddress client, int slot, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> Future<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0);
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0);
return mainPromise;
}
@ -133,9 +133,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
final Promise<Collection<R>> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
Promise<R> promise = new DefaultPromise<R>() {
List<R> results = new ArrayList<R>();
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
AtomicInteger counter = new AtomicInteger(nodes.size());
@Override
public Promise<R> setSuccess(R result) {
if (result instanceof Collection) {
@ -163,8 +164,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
};
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0);
for (MasterSlaveEntry entry : nodes) {
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
}
return mainPromise;
}
@ -172,25 +173,25 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
final List<ClusterSlotRange> slots = new ArrayList<ClusterSlotRange>(connectionManager.getEntries().keySet());
Collections.shuffle(slots);
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
Collections.shuffle(nodes);
retryReadRandomAsync(command, mainPromise, slots, params);
retryReadRandomAsync(command, mainPromise, nodes, params);
return mainPromise;
}
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final Promise<R> mainPromise,
final List<ClusterSlotRange> slots, final Object... params) {
final List<MasterSlaveEntry> nodes, final Object... params) {
final Promise<R> attemptPromise = connectionManager.newPromise();
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isSuccess()) {
if (future.getNow() == null) {
if (slots.isEmpty()) {
if (nodes.isEmpty()) {
mainPromise.setSuccess(null);
} else {
retryReadRandomAsync(command, mainPromise, slots, params);
retryReadRandomAsync(command, mainPromise, nodes, params);
}
} else {
mainPromise.setSuccess(future.getNow());
@ -201,8 +202,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
});
ClusterSlotRange slot = slots.remove(0);
async(true, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, attemptPromise, 0);
MasterSlaveEntry entry = nodes.remove(0);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0);
}
@Override
@ -222,9 +223,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
final Set<ClusterSlotRange> slots = connectionManager.getEntries().keySet();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
Promise<T> promise = new DefaultPromise<T>() {
AtomicInteger counter = new AtomicInteger(slots.size());
AtomicInteger counter = new AtomicInteger(nodes.size());
@Override
public Promise<T> setSuccess(T result) {
if (callback != null) {
@ -246,8 +247,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return this;
}
};
for (ClusterSlotRange slot : slots) {
async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, params, promise, 0);
for (MasterSlaveEntry entry : nodes) {
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0);
}
return mainPromise;
}
@ -260,10 +261,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
if (slot != 0) {
return new NodeSource(slot);
}
return NodeSource.ZERO;
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry);
}
@Override
@ -274,12 +273,26 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
public <T, R> Future<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0);
return mainPromise;
}
public <T, R> Future<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> Future<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
@ -298,6 +311,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(source, true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> Future<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> Future<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params);
@ -315,6 +333,10 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params);
}
@ -327,8 +349,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> Future<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet();
Promise<T> promise = new DefaultPromise<T>() {
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
AtomicInteger counter = new AtomicInteger(entries.size());
@Override
public Promise<T> setSuccess(T result) {
callback.onSlotResult(result);
@ -351,8 +374,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, new NodeSource(slot.getStartSlot()), connectionManager.getCodec(), command, args.toArray(), promise, 0);
for (MasterSlaveEntry entry : entries) {
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0);
}
return mainPromise;
}

@ -37,6 +37,7 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
@ -79,7 +80,7 @@ public class CommandBatchService extends CommandReactiveService {
private final AtomicInteger index = new AtomicInteger();
private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();
private ConcurrentMap<MasterSlaveEntry, Entry> commands = PlatformDependent.newConcurrentHashMap();
private volatile boolean executed;
@ -93,10 +94,10 @@ public class CommandBatchService extends CommandReactiveService {
if (executed) {
throw new IllegalStateException("Batch already has been executed!");
}
Entry entry = commands.get(nodeSource.getSlot());
Entry entry = commands.get(nodeSource.getEntry());
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(nodeSource.getSlot(), entry);
Entry oldEntry = commands.putIfAbsent(nodeSource.getEntry(), entry);
if (oldEntry != null) {
entry = oldEntry;
}
@ -133,7 +134,7 @@ public class CommandBatchService extends CommandReactiveService {
});
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
}
return voidPromise;
@ -175,7 +176,7 @@ public class CommandBatchService extends CommandReactiveService {
});
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
}
return promise;

@ -24,6 +24,7 @@ import org.redisson.SlotCallback;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.concurrent.Future;
@ -47,7 +48,7 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<T, R> Publisher<R> readRandomReactive(RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> writeReactive(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> writeReactive(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T> Publisher<Void> writeAllReactive(RedisCommand<T> command, Object ... params);

@ -24,6 +24,7 @@ import org.redisson.SlotCallback;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.NettyFuturePublisher;
import io.netty.util.concurrent.Future;
@ -79,8 +80,8 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
}
@Override
public <T, R> Publisher<R> writeReactive(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = writeAsync(slot, codec, command, params);
public <T, R> Publisher<R> writeReactive(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = writeAsync(entry, codec, command, params);
return new NettyFuturePublisher<R>(f);
}

@ -17,7 +17,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.MasterSlaveServersConfig;
@ -26,7 +26,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.core.NodeType;
import org.redisson.misc.InfinitySemaphoreLatch;
@ -71,8 +70,10 @@ public interface ConnectionManager {
Codec getCodec();
Map<ClusterSlotRange, MasterSlaveEntry> getEntries();
Set<MasterSlaveEntry> getEntrySet();
MasterSlaveEntry getEntry(int slot);
<R> Promise<R> newPromise();
void releaseRead(NodeSource source, RedisConnection connection);

@ -141,7 +141,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
log.debug("Current master {} unchanged", master);
} else if (Role.master.equals(role) && !master.equals(addr) && currentMaster.compareAndSet(master, addr)) {
log.info("Master has changed from {} to {}", master, addr);
changeMaster(singleSlotRange, addr.getHost(), addr.getPort());
changeMaster(singleSlotRange.getStartSlot(), addr.getHost(), addr.getPort());
break;
}
}

@ -102,7 +102,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public static final int MAX_SLOT = 16384;
protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT);
protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT-1);
private final Logger log = LoggerFactory.getLogger(getClass());
@ -122,7 +122,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected boolean isClusterMode;
private final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
private final Promise<Boolean> shutdownPromise;
@ -182,11 +182,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return codec;
}
@Override
public Map<ClusterSlotRange, MasterSlaveEntry> getEntries() {
return entries;
public Set<MasterSlaveEntry> getEntrySet() {
return new HashSet<MasterSlaveEntry>(entries.values());
}
protected void init(MasterSlaveServersConfig config) {
this.config = config;
@ -228,7 +227,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} else {
entry = createMasterSlaveEntry(config, slots);
}
addEntry(singleSlotRange, entry);
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
addEntry(slot, entry);
}
}
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
@ -288,7 +290,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public int calcSlot(String key) {
return 0;
return singleSlotRange.getStartSlot();
}
@Override
@ -556,7 +558,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public MasterSlaveEntry getEntry(InetSocketAddress addr) {
// TODO optimize
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : entries.entrySet()) {
for (Entry<Integer, MasterSlaveEntry> entry : entries.entrySet()) {
if (entry.getValue().getClient().getAddr().equals(addr)) {
return entry.getValue();
}
@ -564,65 +566,62 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null;
}
protected MasterSlaveEntry getEntry(ClusterSlotRange slotRange) {
return entries.get(slotRange);
public MasterSlaveEntry getEntry(int slot) {
return entries.get(slot);
}
protected MasterSlaveEntry getEntry(int slot) {
// TODO optimize
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : entries.entrySet()) {
if (entry.getKey().isOwn(slot)) {
return entry.getValue();
}
}
return null;
}
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
getEntry(slotRange).slaveDown(host, port, freezeReason);
getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason);
}
protected void changeMaster(ClusterSlotRange slotRange, String host, int port) {
getEntry(slotRange).changeMaster(host, port);
protected void changeMaster(int slot, String host, int port) {
getEntry(slot).changeMaster(host, port);
}
protected void addEntry(ClusterSlotRange slotRange, MasterSlaveEntry entry) {
entries.put(slotRange, entry);
protected void addEntry(Integer slot, MasterSlaveEntry entry) {
entries.put(slot, entry);
}
protected MasterSlaveEntry removeMaster(ClusterSlotRange slotRange) {
return entries.remove(slotRange);
protected MasterSlaveEntry removeMaster(Integer slot) {
return entries.remove(slot);
}
@Override
public Future<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(source, command);
return e.connectionWriteOp();
MasterSlaveEntry entry = source.getEntry();
if (entry == null) {
entry = getEntry(source);
}
return entry.connectionWriteOp();
}
private MasterSlaveEntry getEntry(NodeSource source) {
MasterSlaveEntry e = getEntry(source.getSlot());
if (e == null) {
throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot());
// workaround for slots in migration state
if (source.getRedirect() != null) {
MasterSlaveEntry e = getEntry(source.getAddr());
if (e == null) {
throw new RedisNodeNotFoundException("No node for slot: " + source.getAddr());
}
return e;
}
return e;
}
private MasterSlaveEntry getEntry(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(source.getSlot());
if (e == null) {
throw new RedisNodeNotFoundException("No node for slot: " + source.getSlot() + " and command " + command);
throw new RedisNodeNotFoundException("No node with slot: " + source.getSlot());
}
return e;
}
@Override
public Future<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(source, command);
MasterSlaveEntry entry = source.getEntry();
if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot());
}
if (source.getAddr() != null) {
return e.connectionReadOp(source.getAddr());
return entry.connectionReadOp(source.getAddr());
}
return e.connectionReadOp();
return entry.connectionReadOp();
}
Future<RedisPubSubConnection> nextPubSubConnection(int slot) {
@ -635,12 +634,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void releaseWrite(NodeSource source, RedisConnection connection) {
getEntry(source).releaseWrite(connection);
MasterSlaveEntry entry = source.getEntry();
if (entry == null) {
entry = getEntry(source);
}
entry.releaseWrite(connection);
}
@Override
public void releaseRead(NodeSource source, RedisConnection connection) {
getEntry(source).releaseRead(connection);
MasterSlaveEntry entry = source.getEntry();
if (entry == null) {
entry = getEntry(source);
}
entry.releaseRead(connection);
}
@Override

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@ -61,12 +62,16 @@ public class MasterSlaveEntry {
final ConnectionManager connectionManager;
final MasterConnectionPool writeConnectionHolder;
final Set<ClusterSlotRange> slotRanges;
final Set<Integer> slots = new HashSet<Integer>();
final AtomicBoolean active = new AtomicBoolean(true);
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.slotRanges = slotRanges;
for (ClusterSlotRange clusterSlotRange : slotRanges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.add(i);
}
}
this.connectionManager = connectionManager;
this.config = config;
@ -405,16 +410,16 @@ public class MasterSlaveEntry {
slaveBalancer.shutdown();
}
public void addSlotRange(ClusterSlotRange range) {
slotRanges.add(range);
public void addSlotRange(Integer range) {
slots.add(range);
}
public void removeSlotRange(ClusterSlotRange range) {
slotRanges.remove(range);
public void removeSlotRange(Integer range) {
slots.remove(range);
}
public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges;
public Set<Integer> getSlotRanges() {
return slots;
}
}

@ -26,7 +26,18 @@ public class NodeSource {
private final Integer slot;
private final InetSocketAddress addr;
private final Redirect redirect;
private MasterSlaveEntry entry;
public NodeSource(MasterSlaveEntry entry) {
this(null, null, null);
this.entry = entry;
}
public NodeSource(MasterSlaveEntry entry, InetSocketAddress addr) {
this(null, addr, null);
this.entry = entry;
}
public NodeSource(Integer slot) {
this(slot, null, null);
}
@ -41,6 +52,10 @@ public class NodeSource {
this.redirect = redirect;
}
public MasterSlaveEntry getEntry() {
return entry;
}
public Redirect getRedirect() {
return redirect;
}

@ -218,7 +218,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
// to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null && config.getReadMode() == ReadMode.SLAVE) {
Future<Void> future = getEntry(singleSlotRange).addSlave(ip, Integer.valueOf(port));
Future<Void> future = getEntry(singleSlotRange.getStartSlot()).addSlave(ip, Integer.valueOf(port));
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
@ -228,7 +228,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr);
}
@ -265,7 +265,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2];
String port = parts[3];
MasterSlaveEntry entry = getEntry(singleSlotRange);
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.getFreezeReason() != FreezeReason.MANAGER) {
entry.freeze();
String addr = ip + ":" + port;
@ -279,7 +279,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private void slaveDown(String ip, String port) {
if (config.getReadMode() == ReadMode.SLAVE) {
slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER);
getEntry(singleSlotRange.getStartSlot()).slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER);
}
log.warn("slave: {}:{} has down", ip, port);
@ -299,7 +299,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = parts[3];
String masterAddr = ip + ":" + port;
MasterSlaveEntry entry = getEntry(singleSlotRange);
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.isFreezed()
&& entry.getClient().getAddr().equals(new InetSocketAddress(ip, Integer.valueOf(port)))) {
entry.unfreeze();
@ -318,7 +318,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr);
}
@ -336,8 +336,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String newMaster = ip + ":" + port;
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
changeMaster(singleSlotRange, ip, Integer.valueOf(port));
log.info("master has changed from {} to {}", current, newMaster);
changeMaster(singleSlotRange.getStartSlot(), ip, Integer.valueOf(port));
log.info("master {} changed to {}", current, newMaster);
}
}
} else {

@ -88,7 +88,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress());
if (currentMaster.compareAndSet(master, now)) {
changeMaster(singleSlotRange, cfg.getAddress().getHost(), cfg.getAddress().getPort());
changeMaster(singleSlotRange.getStartSlot(), cfg.getAddress().getHost(), cfg.getAddress().getPort());
log.info("Master has been changed");
}
}

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.reactivestreams.Publisher;
@ -30,6 +31,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.MasterSlaveEntry;
import reactor.rx.Stream;
import reactor.rx.Streams;
@ -55,8 +57,8 @@ public class RedissonKeysReactive implements RKeysReactive {
@Override
public Publisher<String> getKeysByPattern(final String pattern) {
List<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
publishers.add(createKeysIterator(slot.getStartSlot(), pattern));
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
publishers.add(createKeysIterator(entry, pattern));
}
return Streams.merge(publishers);
}
@ -66,14 +68,14 @@ public class RedissonKeysReactive implements RKeysReactive {
return getKeysByPattern(null);
}
private Publisher<ListScanResult<String>> scanIterator(int slot, long startPos, String pattern) {
private Publisher<ListScanResult<String>> scanIterator(MasterSlaveEntry entry, long startPos, String pattern) {
if (pattern == null) {
return commandExecutor.writeReactive(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
}
return commandExecutor.writeReactive(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
}
private Publisher<String> createKeysIterator(final int slot, final String pattern) {
private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern) {
return new Stream<String>() {
@Override
@ -94,7 +96,7 @@ public class RedissonKeysReactive implements RKeysReactive {
protected void nextValues() {
final ReactiveSubscription<String> m = this;
scanIterator(slot, nextIterPos, pattern).subscribe(new Subscriber<ListScanResult<String>>() {
scanIterator(entry, nextIterPos, pattern).subscribe(new Subscriber<ListScanResult<String>>() {
@Override
public void onSubscribe(Subscription s) {

Loading…
Cancel
Save