Slot change discovery in cluster mode. #27

pull/110/head
Nikita 10 years ago
parent b0822fbc8c
commit 8fd1c6f26e

@ -18,12 +18,15 @@ package org.redisson.connection;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.ClusterServersConfig;
@ -43,7 +46,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final List<RedisClient> nodeClients = new ArrayList<RedisClient>();
private Collection<ClusterPartition> lastPartitions;
private final Map<Integer, ClusterPartition> lastPartitions = new HashMap<Integer, ClusterPartition>();
private ScheduledFuture<?> monitorFuture;
@ -56,21 +59,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisAsyncConnection<String, String> connection = client.connectAsync();
String nodesValue = connection.clusterNodes().awaitUninterruptibly().getNow();
Collection<ClusterPartition> partitions = extractPartitions(nodesValue);
for (ClusterPartition partition : partitions) {
Map<Integer, ClusterPartition> partitions = extractPartitions(nodesValue);
for (ClusterPartition partition : partitions.values()) {
if (partition.isMasterFail()) {
continue;
}
MasterSlaveServersConfig c = create(cfg);
log.info("master: {}", partition.getMasterAddress());
c.setMasterAddress(partition.getMasterAddress());
SingleEntry entry = new SingleEntry(codec, group, c);
entries.put(partition.getEndSlot(), entry);
addMasterEntry(partition, cfg);
}
lastPartitions = partitions;
break;
} catch (RedisConnectionException e) {
@ -85,6 +82,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
monitorClusterChange(cfg);
}
private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
MasterSlaveServersConfig c = create(cfg);
log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
c.setMasterAddress(partition.getMasterAddress());
SingleEntry entry = new SingleEntry(codec, group, c);
entries.put(partition.getEndSlot(), entry);
lastPartitions.put(partition.getEndSlot(), partition);
}
private void monitorClusterChange(final ClusterServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
@Override
@ -96,40 +103,32 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisAsyncConnection<String, String> connection = client.connectAsync();
String nodesValue = connection.clusterNodes().awaitUninterruptibly().getNow();
Collection<ClusterPartition> partitions = extractPartitions(nodesValue);
for (ClusterPartition newPart : partitions) {
boolean found = false;
for (ClusterPartition part : lastPartitions) {
Map<Integer, ClusterPartition> partitions = extractPartitions(nodesValue);
for (ClusterPartition newPart : partitions.values()) {
for (ClusterPartition part : lastPartitions.values()) {
if (newPart.getMasterAddress().equals(part.getMasterAddress())) {
log.debug("found endslot {} for {} fail {}", newPart.getEndSlot(), newPart.getMasterAddress(), newPart.isMasterFail());
found = true;
if (newPart.isMasterFail() && !part.isMasterFail()) {
for (ClusterPartition newMasterPart : partitions) {
if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())
&& newMasterPart.getEndSlot() == part.getEndSlot()) {
log.debug("changing master from {} to {} for {}",
part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot());
URI newUri = toURI(newMasterPart.getMasterAddress());
URI oldUri = toURI(part.getMasterAddress());
changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort());
slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort());
part.setMasterFail(true);
monitorFuture.cancel(true);
}
log.debug("found endslot {} for {} fail {}", part.getEndSlot(), part.getMasterAddress(), newPart.isMasterFail());
if (newPart.isMasterFail()) {
ClusterPartition newMasterPart = partitions.get(part.getEndSlot());
if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) {
log.debug("changing master from {} to {} for {}",
part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot());
URI newUri = toURI(newMasterPart.getMasterAddress());
URI oldUri = toURI(part.getMasterAddress());
changeMaster(newMasterPart.getEndSlot(), newUri.getHost(), newUri.getPort());
slaveDown(newMasterPart.getEndSlot(), oldUri.getHost(), oldUri.getPort());
part.setMasterAddress(newMasterPart.getMasterAddress());
}
}
break;
}
}
if (!found) {
// TODO slot changed
}
}
checkSlotsChange(cfg, partitions);
break;
} catch (RedisConnectionException e) {
@ -144,11 +143,44 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private Collection<ClusterPartition> extractPartitions(String nodesValue) {
private void checkSlotsChange(ClusterServersConfig cfg, Map<Integer, ClusterPartition> partitions) {
Set<Integer> removeSlots = new HashSet<Integer>(lastPartitions.keySet());
removeSlots.removeAll(partitions.keySet());
lastPartitions.keySet().removeAll(removeSlots);
if (!removeSlots.isEmpty()) {
log.info("{} slots found to remove", removeSlots.size());
}
Map<Integer, MasterSlaveEntry> removeAddrs = new HashMap<Integer, MasterSlaveEntry>();
for (Integer slot : removeSlots) {
MasterSlaveEntry entry = removeMaster(slot);
entry.shutdownMasterAsync();
removeAddrs.put(slot, entry);
}
Set<Integer> addSlots = new HashSet<Integer>(partitions.keySet());
addSlots.removeAll(lastPartitions.keySet());
if (!addSlots.isEmpty()) {
log.info("{} slots found to add", addSlots.size());
}
for (Integer slot : addSlots) {
ClusterPartition partition = partitions.get(slot);
addMasterEntry(partition, cfg);
}
for (Entry<Integer, MasterSlaveEntry> entry : removeAddrs.entrySet()) {
InetSocketAddress url = entry.getValue().getClient().getAddr();
slaveDown(entry.getKey(), url.getHostName(), url.getPort());
}
}
private Map<Integer, ClusterPartition> extractPartitions(String nodesValue) {
Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
Map<Integer, ClusterPartition> result = new HashMap<Integer, ClusterPartition>();
List<ClusterNodeInfo> nodes = parse(nodesValue);
for (ClusterNodeInfo clusterNodeInfo : nodes) {
String id = clusterNodeInfo.getNodeId();
@ -168,11 +200,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
partition.addSlaveAddress(clusterNodeInfo.getAddress());
} else {
partition.setStartSlot(clusterNodeInfo.getStartSlot());
partition.setEndSlot(clusterNodeInfo.getEndSlot());
result.put(clusterNodeInfo.getEndSlot(), partition);
partition.setMasterAddress(clusterNodeInfo.getAddress());
}
}
return partitions.values();
return result;
}
private MasterSlaveServersConfig create(ClusterServersConfig cfg) {
@ -187,11 +221,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return c;
}
public static void main(String[] args) {
String s = "FAIL?".replaceAll("\\?", "");
System.out.println(s);
}
private List<ClusterNodeInfo> parse(String nodesResponse) {
List<ClusterNodeInfo> nodes = new ArrayList<ClusterNodeInfo>();
for (String nodeInfo : nodesResponse.split("\n")) {

@ -20,6 +20,7 @@ import java.util.List;
public class ClusterPartition {
private int startSlot;
private int endSlot;
private boolean masterFail;
private String masterAddress;
@ -32,6 +33,13 @@ public class ClusterPartition {
return masterFail;
}
public int getStartSlot() {
return startSlot;
}
public void setStartSlot(int startSlot) {
this.startSlot = startSlot;
}
public int getEndSlot() {
return endSlot;
}

@ -625,6 +625,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
getEntry(endSlot).changeMaster(host, port);
}
protected MasterSlaveEntry removeMaster(int endSlot) {
return entries.remove(endSlot);
}
protected <K, V> RedisConnection<K, V> connectionWriteOp(int slot) {
return getEntry(slot).connectionWriteOp();
}

@ -91,6 +91,10 @@ public class MasterSlaveEntry {
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
public RedisClient getClient() {
return masterEntry.getClient();
}
public void slaveUp(String host, int port) {
slaveBalancer.unfreeze(host, port);
}
@ -108,6 +112,11 @@ public class MasterSlaveEntry {
oldMaster.getClient().shutdown();
}
public void shutdownMasterAsync() {
masterEntry.getClient().shutdownAsync();
slaveBalancer.shutdown();
}
public <K, V> RedisConnection<K, V> connectionWriteOp() {
acquireMasterConnection();

Loading…
Cancel
Save