Seamless slots migration across nodes. #264

pull/282/head
Nikita 9 years ago
parent 070fff876a
commit f8d3b5612e

@ -195,9 +195,9 @@ public class CommandBatchExecutorService extends CommandExecutorService {
Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
connectionFuture = connectionManager.connectionReadOp(slot);
connectionFuture = connectionManager.connectionReadOp(slot, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(slot);
connectionFuture = connectionManager.connectionWriteOp(slot, null);
}
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@ -214,7 +214,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
RedisConnection connection = connFuture.getNow();
ArrayList<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}

@ -259,9 +259,9 @@ public class CommandExecutorService implements CommandExecutor {
try {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(slot);
connectionFuture = connectionManager.connectionReadOp(slot, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(slot);
connectionFuture = connectionManager.connectionWriteOp(slot, null);
}
connectionFuture.syncUninterruptibly();
@ -437,12 +437,12 @@ public class CommandExecutorService implements CommandExecutor {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
if (client != null) {
connectionFuture = connectionManager.connectionReadOp(slot, client);
connectionFuture = connectionManager.connectionReadOp(slot, command, client);
} else {
connectionFuture = connectionManager.connectionReadOp(slot);
connectionFuture = connectionManager.connectionReadOp(slot, command);
}
} else {
connectionFuture = connectionManager.connectionWriteOp(slot);
connectionFuture = connectionManager.connectionWriteOp(slot, command);
}
connectionFuture.addListener(new FutureListener<RedisConnection>() {

@ -221,7 +221,7 @@ public class RedisCommand<R> {
@Override
public String toString() {
return "RedisCommand [name=" + name + ", subName=" + subName + "]";
return "RedisCommand [" + name + " " + subName != null ? subName : "" + "]";
}
}

@ -15,7 +15,6 @@
*/
package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -23,7 +22,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -35,6 +33,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.connection.CRC16;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
@ -123,7 +122,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
SingleEntry entry = new SingleEntry(partition.getSlotRanges(), this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addMaster(slotRange, entry);
addEntry(slotRange, entry);
lastPartitions.put(slotRange, partition);
}
}
@ -205,40 +204,38 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> partitions) {
Collection<ClusterSlotRange> partitionsSlots = slots(partitions);
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
checkSlotsMigration(newPartitions);
Collection<ClusterSlotRange> newPartitionsSlots = slots(newPartitions);
Set<ClusterSlotRange> removedSlots = new HashSet<ClusterSlotRange>(lastPartitions.keySet());
removedSlots.removeAll(partitionsSlots);
removedSlots.removeAll(newPartitionsSlots);
lastPartitions.keySet().removeAll(removedSlots);
if (!removedSlots.isEmpty()) {
log.info("{} slot ranges found to remove", removedSlots.size());
}
Map<ClusterSlotRange, MasterSlaveEntry> removeAddrs = new HashMap<ClusterSlotRange, MasterSlaveEntry>();
for (ClusterSlotRange slot : removedSlots) {
MasterSlaveEntry entry = removeMaster(slot);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync();
removeAddrs.put(slot, entry);
log.info("{} master and slaves for it removed", entry.getClient().getAddr());
}
}
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : removeAddrs.entrySet()) {
InetSocketAddress url = entry.getValue().getClient().getAddr();
slaveDown(entry.getKey(), url.getHostName(), url.getPort());
}
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(partitionsSlots);
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartitionsSlots);
addedSlots.removeAll(lastPartitions.keySet());
if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots.size());
}
for (ClusterSlotRange slot : addedSlots) {
ClusterPartition partition = find(partitions, slot);
ClusterPartition partition = find(newPartitions, slot);
boolean masterFound = false;
for (MasterSlaveEntry entry : getEntries().values()) {
if (entry.getClient().getAddr().equals(partition.getMasterAddr())) {
addMaster(slot, entry);
addEntry(slot, entry);
lastPartitions.put(slot, partition);
masterFound = true;
break;
@ -248,7 +245,54 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
addMasterEntry(partition, cfg);
}
}
}
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
List<ClusterPartition> currentPartitions = new ArrayList<ClusterPartition>(lastPartitions.values());
for (ClusterPartition currentPartition : currentPartitions) {
for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) {
continue;
}
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(newPartition.getSlotRanges());
addedSlots.removeAll(currentPartition.getSlotRanges());
MasterSlaveEntry entry = getEntry(currentPartition.getSlotRanges().iterator().next());
for (ClusterSlotRange slot : addedSlots) {
entry.addSlotRange(slot);
addEntry(slot, entry);
log.info("slot {} added for {}", slot, entry.getClient().getAddr());
lastPartitions.put(slot, currentPartition);
}
Set<ClusterSlotRange> removedSlots = new HashSet<ClusterSlotRange>(currentPartition.getSlotRanges());
removedSlots.removeAll(newPartition.getSlotRanges());
lastPartitions.keySet().removeAll(removedSlots);
for (ClusterSlotRange slot : removedSlots) {
log.info("slot {} removed for {}", slot, entry.getClient().getAddr());
entry.removeSlotRange(slot);
removeMaster(slot);
}
}
}
}
@Override
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
private Collection<ClusterPartition> parsePartitions(String nodesValue) {
@ -268,7 +312,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
ClusterPartition partition = partitions.get(id);
if (partition == null) {
partition = new ClusterPartition();
partition = new ClusterPartition(id);
partitions.put(id, partition);
}

@ -26,11 +26,21 @@ import org.redisson.misc.URIBuilder;
public class ClusterPartition {
private final String nodeId;
private boolean masterFail;
private URI masterAddress;
private List<URI> slaveAddresses = new ArrayList<URI>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
public ClusterPartition(String nodeId) {
super();
this.nodeId = nodeId;
}
public String getNodeId() {
return nodeId;
}
public void setMasterFail(boolean masterFail) {
this.masterFail = masterFail;
}

@ -24,6 +24,7 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.misc.InfinitySemaphoreLatch;
@ -63,11 +64,11 @@ public interface ConnectionManager {
void releaseWrite(int slot, RedisConnection connection);
Future<RedisConnection> connectionReadOp(int slot);
Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command);
Future<RedisConnection> connectionReadOp(int slot, RedisClient client);
Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command, RedisClient client);
Future<RedisConnection> connectionWriteOp(int slot);
Future<RedisConnection> connectionWriteOp(int slot, RedisCommand<?> command);
<T> FutureListener<T> createReleaseReadListener(int slot,
RedisConnection conn, Timeout timeout);

@ -33,6 +33,7 @@ import org.redisson.client.RedisEmptySlotException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.misc.InfinitySemaphoreLatch;
@ -60,7 +61,7 @@ import io.netty.util.internal.PlatformDependent;
*/
public class MasterSlaveConnectionManager implements ConnectionManager {
static final int MAX_SLOT = 16384;
protected static final int MAX_SLOT = 16384;
protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT);
@ -135,7 +136,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
slots.add(singleSlotRange);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addMaster(singleSlotRange, entry);
addEntry(singleSlotRange, entry);
}
protected void init(Config cfg) {
@ -206,19 +207,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public int calcSlot(String key) {
if (entries.size() == 1 || key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
return 0;
}
@Override
@ -547,7 +536,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
getEntry(slotRange).changeMaster(host, port);
}
protected void addMaster(ClusterSlotRange slotRange, MasterSlaveEntry entry) {
protected void addEntry(ClusterSlotRange slotRange, MasterSlaveEntry entry) {
entries.put(slotRange, entry);
}
@ -556,28 +545,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<RedisConnection> connectionWriteOp(int slot) {
public Future<RedisConnection> connectionWriteOp(int slot, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(slot);
if (e == null) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot);
}
return e.connectionWriteOp();
}
@Override
public Future<RedisConnection> connectionReadOp(int slot) {
public Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command) {
MasterSlaveEntry e = getEntry(slot);
if (e == null) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot);
}
return e.connectionReadOp();
}
@Override
public Future<RedisConnection> connectionReadOp(int slot, RedisClient client) {
public Future<RedisConnection> connectionReadOp(int slot, RedisCommand<?> command, RedisClient client) {
MasterSlaveEntry e = getEntry(slot);
if (e == null) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
throw new RedisEmptySlotException("No node for slot: " + slot + " and command " + command, slot);
}
return e.connectionReadOp(client);
}

@ -86,6 +86,7 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
public Collection<RedisPubSubConnection> slaveDown(String host, int port) {
Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port);
// add master as slave if no more slaves available
if (slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
slaveUp(addr.getHostName(), addr.getPort());
@ -177,6 +178,10 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
slaveBalancer.shutdown();
}
public void addSlotRange(ClusterSlotRange range) {
slotRanges.add(range);
}
public void removeSlotRange(ClusterSlotRange range) {
slotRanges.remove(range);
}

@ -76,7 +76,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
slots.add(singleSlotRange);
SingleEntry entry = new SingleEntry(slots, this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addMaster(singleSlotRange, entry);
addEntry(singleSlotRange, entry);
}
private void monitorDnsChange(final SingleServerConfig cfg) {

Loading…
Cancel
Save