Slot changes discovery optimization. #264

pull/282/head
Nikita 9 years ago
parent 2323e9bc02
commit 070fff876a

@ -85,7 +85,7 @@ public class RedisConnection implements RedisCommands {
public <R> R await(Future<R> cmd) {
if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)cmd;
RedisTimeoutException ex = new RedisTimeoutException();
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex);
throw ex;
}

@ -19,4 +19,11 @@ public class RedisTimeoutException extends RedisException {
private static final long serialVersionUID = -8418769175260962404L;
public RedisTimeoutException() {
}
public RedisTimeoutException(String message) {
super(message);
}
}

@ -135,13 +135,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
try {
for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr);
if (connection == null) {
if (connection == null || !connection.isActive()) {
continue;
}
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);
log.debug("cluster nodes state: {}", nodesValue);
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue);
checkMasterNodesChange(newPartitions);
@ -217,8 +217,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Map<ClusterSlotRange, MasterSlaveEntry> removeAddrs = new HashMap<ClusterSlotRange, MasterSlaveEntry>();
for (ClusterSlotRange slot : removedSlots) {
MasterSlaveEntry entry = removeMaster(slot);
entry.shutdownMasterAsync();
removeAddrs.put(slot, entry);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync();
removeAddrs.put(slot, entry);
}
}
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : removeAddrs.entrySet()) {
InetSocketAddress url = entry.getValue().getClient().getAddr();
slaveDown(entry.getKey(), url.getHostName(), url.getPort());
}
Set<ClusterSlotRange> addedSlots = new HashSet<ClusterSlotRange>(partitionsSlots);
@ -228,13 +235,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
for (ClusterSlotRange slot : addedSlots) {
ClusterPartition partition = find(partitions, slot);
addMasterEntry(partition, cfg);
boolean masterFound = false;
for (MasterSlaveEntry entry : getEntries().values()) {
if (entry.getClient().getAddr().equals(partition.getMasterAddr())) {
addMaster(slot, entry);
lastPartitions.put(slot, partition);
masterFound = true;
break;
}
}
if (!masterFound) {
addMasterEntry(partition, cfg);
}
}
for (Entry<ClusterSlotRange, MasterSlaveEntry> entry : removeAddrs.entrySet()) {
InetSocketAddress url = entry.getValue().getClient().getAddr();
slaveDown(entry.getKey(), url.getHostName(), url.getPort());
}
}
private Collection<ClusterPartition> parsePartitions(String nodesValue) {

@ -15,9 +15,12 @@
*/
package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.misc.URIBuilder;
@ -26,7 +29,7 @@ public class ClusterPartition {
private boolean masterFail;
private URI masterAddress;
private List<URI> slaveAddresses = new ArrayList<URI>();
private final List<ClusterSlotRange> slotRanges = new ArrayList<ClusterSlotRange>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
public void setMasterFail(boolean masterFail) {
this.masterFail = masterFail;
@ -38,10 +41,14 @@ public class ClusterPartition {
public void addSlotRanges(List<ClusterSlotRange> ranges) {
slotRanges.addAll(ranges);
}
public List<ClusterSlotRange> getSlotRanges() {
public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges;
}
public InetSocketAddress getMasterAddr() {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
}
public URI getMasterAddress() {
return masterAddress;
}

@ -17,7 +17,6 @@ package org.redisson.connection;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.TimeUnit;
import org.redisson.MasterSlaveServersConfig;

@ -131,10 +131,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
protected void initEntry(MasterSlaveServersConfig config) {
ClusterSlotRange range = new ClusterSlotRange(0, MAX_SLOT);
MasterSlaveEntry entry = new MasterSlaveEntry(Collections.singletonList(range), this, config);
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addMaster(range, entry);
addMaster(singleSlotRange, entry);
}
protected void init(Config cfg) {

@ -20,6 +20,8 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
@ -49,9 +51,11 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
final ConnectionManager connectionManager;
final ConnectionPool<RedisConnection> writeConnectionHolder;
final List<ClusterSlotRange> slotRanges;
final Set<ClusterSlotRange> slotRanges;
public MasterSlaveEntry(List<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
final AtomicBoolean active = new AtomicBoolean(true);
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.slotRanges = slotRanges;
this.connectionManager = connectionManager;
this.config = config;
@ -127,6 +131,10 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
}
public void shutdownMasterAsync() {
if (!active.compareAndSet(true, false)) {
return;
}
connectionManager.shutdownAsync(masterEntry.getClient());
slaveBalancer.shutdownAsync();
}
@ -161,11 +169,19 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
}
public void shutdown() {
if (!active.compareAndSet(true, false)) {
return;
}
masterEntry.getClient().shutdown();
slaveBalancer.shutdown();
}
public List<ClusterSlotRange> getSlotRanges() {
public void removeSlotRange(ClusterSlotRange range) {
slotRanges.remove(range);
}
public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges;
}

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -71,7 +72,9 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
@Override
protected void initEntry(MasterSlaveServersConfig config) {
SingleEntry entry = new SingleEntry(Collections.singletonList(singleSlotRange), this, config);
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
SingleEntry entry = new SingleEntry(slots, this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
addMaster(singleSlotRange, entry);
}

@ -15,7 +15,7 @@
*/
package org.redisson.connection;
import java.util.List;
import java.util.Set;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
@ -31,7 +31,7 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder;
public SingleEntry(List<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config);
pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup());
}

Loading…
Cancel
Save