Fixed - cluster state update manager can't try next node if current node fails. #476

pull/482/head
Nikita 9 years ago
parent 13664c3153
commit 16c5df230f

@ -222,7 +222,7 @@ public class RedisConnection implements RedisCommands {
@Override
public String toString() {
return getClass().getSimpleName() + " [redisClient=" + redisClient + ", channel=" + channel + "]";
return getClass().getSimpleName() + "@" + System.identityHashCode(this) + " [redisClient=" + redisClient + ", channel=" + channel + "]";
}
public Future<?> getAcquireFuture() {

@ -113,7 +113,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Can't connect to servers!", lastException);
}
scheduleClusterChangeCheck(cfg);
scheduleClusterChangeCheck(cfg, null);
}
private void close(RedisConnection conn) {
if (nodeConnections.values().remove(conn)) {
conn.closeAsync();
}
}
private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
@ -188,7 +194,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
final RedisConnection connection = future.getNow();
Future<Map<String, String>> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO);
Future<Map<String, String>> clusterFuture = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_INFO);
clusterFuture.addListener(new FutureListener<Map<String, String>>() {
@Override
@ -256,26 +262,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg) {
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URI> iterator) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
List<URI> nodes = new ArrayList<URI>();
List<URI> slaves = new ArrayList<URI>();
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
for (ClusterPartition partition : lastPartitions.values()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
Iterator<URI> nodesIterator = iterator;
if (nodesIterator == null) {
List<URI> nodes = new ArrayList<URI>();
List<URI> slaves = new ArrayList<URI>();
for (ClusterPartition partition : lastPartitions.values()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
Set<URI> partitionSlaves = new HashSet<URI>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
}
Set<URI> partitionSlaves = new HashSet<URI>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
// master nodes first
nodes.addAll(slaves);
nodesIterator = nodes.iterator();
}
// master nodes first
nodes.addAll(slaves);
checkClusterState(cfg, nodes.iterator(), lastException);
checkClusterState(cfg, nodesIterator, lastException);
}
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
@ -284,7 +295,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get());
scheduleClusterChangeCheck(cfg);
scheduleClusterChangeCheck(cfg, null);
return;
}
URI uri = iterator.next();
@ -299,19 +310,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
RedisConnection connection = future.getNow();
updateClusterState(cfg, connection);
updateClusterState(cfg, connection, iterator);
}
});
}
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection) {
Future<String> future = connection.async(RedisCommands.CLUSTER_NODES);
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, Iterator<URI> iterator) {
Future<String> future = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
scheduleClusterChangeCheck(cfg);
close(connection);
scheduleClusterChangeCheck(cfg, iterator);
return;
}
@ -322,7 +334,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
checkMasterNodesChange(newPartitions);
checkSlaveNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions);
scheduleClusterChangeCheck(cfg);
scheduleClusterChangeCheck(cfg, null);
}
});
}
@ -432,7 +444,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
URI oldUri = currentPart.getMasterAddress();
changeMaster(currentSlotRange, newUri.getHost(), newUri.getPort());
slaveDown(currentSlotRange, oldUri.getHost(), oldUri.getPort(), FreezeReason.MANAGER);
currentPart.setMasterAddress(newMasterPart.getMasterAddress());
}

@ -96,12 +96,25 @@ public class MasterSlaveEntry {
return writeConnectionHolder.add(masterEntry);
}
private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) {
ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason);
if (e == null) {
return false;
}
return slaveDown(e);
}
public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason);
if (entry == null) {
return false;
}
return slaveDown(entry);
}
private boolean slaveDown(ClientConnectionsEntry entry) {
// add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
@ -305,7 +318,7 @@ public class MasterSlaveEntry {
ClientConnectionsEntry oldMaster = masterEntry;
setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster);
oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster, FreezeReason.MANAGER);
// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE

@ -36,6 +36,8 @@ public interface LoadBalancerManager {
boolean unfreeze(String host, int port, FreezeReason freezeReason);
ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason);
ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason);
Future<Void> add(ClientConnectionsEntry entry);

@ -94,25 +94,28 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
}
return false;
}
public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;
}
synchronized (connectionEntry) {
if (connectionEntry.isFreezed()) {
return null;
}
connectionEntry.setFreezed(true);
// only RECONNECT freeze reason could be replaced
if (connectionEntry.getFreezeReason() == null
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
connectionEntry.setFreezed(true);
connectionEntry.setFreezeReason(freezeReason);
return connectionEntry;
}
if (connectionEntry.isFreezed()) {
return null;
}
}

@ -153,10 +153,15 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. Increase connection pool size.");
// if (!freezed.isEmpty()) {
// errorMsg.append(" Disconnected hosts: " + freezed);
// }
StringBuilder errorMsg;
if (connectionManager.isClusterMode()) {
errorMsg = new StringBuilder("Connection pool exhausted! for slots: " + masterSlaveEntry.getSlotRanges());
} else {
errorMsg = new StringBuilder("Connection pool exhausted! ");
}
if (!freezed.isEmpty()) {
errorMsg.append(" Disconnected hosts: " + freezed);
}
if (!zeroConnectionsAmount.isEmpty()) {
errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount);
}

Loading…
Cancel
Save