Cluster configuration with password handling. ClusterConnectionManager optimization. #379

pull/382/head
Nikita 9 years ago
parent 2143f1eb79
commit 3e0f02cbf8

@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -33,14 +34,15 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.connection.CRC16; import org.redisson.connection.CRC16;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry; import org.redisson.connection.SingleEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -70,155 +72,236 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg); this.config = create(cfg);
init(this.config); init(this.config);
Exception lastException = null;
for (URI addr : cfg.getNodeAddresses()) { for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr, true); Future<RedisConnection> connectionFuture = connect(cfg, addr);
if (connection == null) { try {
continue; RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
} String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); Collection<ClusterPartition> partitions = parsePartitions(nodesValue);
List<Future<Collection<Future<Void>>>> futures = new ArrayList<Future<Collection<Future<Void>>>>();
for (ClusterPartition partition : partitions) {
Future<Collection<Future<Void>>> masterFuture = addMasterEntry(partition, cfg);
futures.add(masterFuture);
}
Collection<ClusterPartition> partitions = parsePartitions(nodesValue); for (Future<Collection<Future<Void>>> masterFuture : futures) {
for (ClusterPartition partition : partitions) { masterFuture.syncUninterruptibly();
Collection<Future<Void>> s = addMasterEntry(partition, cfg, true); for (Future<Void> future : masterFuture.getNow()) {
for (Future<Void> future : s) { future.syncUninterruptibly();
future.syncUninterruptibly(); }
} }
break;
} catch (Exception e) {
lastException = e;
log.warn(e.getMessage());
} }
break;
} }
if (lastPartitions.isEmpty()) { if (lastPartitions.isEmpty()) {
throw new RedisConnectionException("Can't connect to servers!"); throw new RedisConnectionException("Can't connect to servers!", lastException);
} }
monitorClusterChange(cfg); monitorClusterChange(cfg);
} }
private RedisConnection connect(ClusterServersConfig cfg, URI addr, boolean skipLogging) { private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
RedisConnection connection = nodeConnections.get(addr); RedisConnection connection = nodeConnections.get(addr);
if (connection != null) { if (connection != null) {
return connection; return newSucceededFuture(connection);
} }
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout()); RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getConnectTimeout());
try { final Promise<RedisConnection> result = newPromise();
connection = client.connect(); Future<RedisConnection> future = client.connectAsync();
nodeConnections.put(addr, connection); future.addListener(new FutureListener<RedisConnection>() {
} catch (RedisConnectionException e) { @Override
if (!skipLogging) { public void operationComplete(Future<RedisConnection> future) throws Exception {
log.warn(e.getMessage(), e); if (!future.isSuccess()) {
} result.setFailure(future.cause());
} catch (Exception e) { return;
if (!skipLogging) { }
log.error(e.getMessage(), e);
} RedisConnection connection = future.getNow();
} Promise<RedisConnection> promise = newPromise();
if (connection != null && !connection.isActive()) { connectListener.onConnect(promise, connection, NodeType.MASTER, config);
if (!skipLogging) { promise.addListener(new FutureListener<RedisConnection>() {
log.warn("connection to {} is not active!", connection.getRedisClient().getAddr()); @Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
RedisConnection connection = future.getNow();
if (connection.isActive()) {
nodeConnections.put(addr, connection);
result.setSuccess(connection);
} else {
connection.closeAsync();
result.setFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
}
}
});
} }
connection.closeAsync(); });
connection = null;
} return result;
if (connection == null) {
nodeConnections.remove(addr);
}
return connection;
} }
@Override @Override
protected void initEntry(MasterSlaveServersConfig config) { protected void initEntry(MasterSlaveServersConfig config) {
} }
private Collection<Future<Void>> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg, boolean skipLogging) { private Future<Collection<Future<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
log.warn("Failed to add master: {} for slot ranges: {}. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges()); RedisException e = new RedisException("Failed to add master: " +
Future<Void> f = newSucceededFuture(null); partition.getMasterAddress() + " for slot ranges: " +
return Collections.singletonList(f); partition.getSlotRanges() + ". Reason - server has FAIL flag");
return newFailedFuture(e);
} }
RedisConnection connection = connect(cfg, partition.getMasterAddress(), skipLogging); final Promise<Collection<Future<Void>>> result = newPromise();
if (connection == null) { Future<RedisConnection> connectionFuture = connect(cfg, partition.getMasterAddress());
Future<Void> f = newSucceededFuture(null); connectionFuture.addListener(new FutureListener<RedisConnection>() {
return Collections.singletonList(f);
}
Map<String, String> params = connection.sync(RedisCommands.CLUSTER_INFO);
if ("fail".equals(params.get("cluster_state"))) {
log.warn("Failed to add master: {} for slot ranges: {}. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges());
Future<Void> f = newSucceededFuture(null);
return Collections.singletonList(f);
}
MasterSlaveServersConfig config = create(cfg);
config.setMasterAddress(partition.getMasterAddress());
final AtomicReference<MasterSlaveEntry> entry = new AtomicReference<MasterSlaveEntry>();
List<Future<Void>> futures = new ArrayList<Future<Void>>();
if (isReadFromSlaves) {
config.setSlaveAddresses(partition.getSlaveAddresses());
MasterSlaveEntry e = new MasterSlaveEntry(partition.getSlotRanges(), this, config);
List<Future<Void>> fs = e.initSlaveBalancer(config);
futures.addAll(fs);
entry.set(e);
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
} else {
SingleEntry e = new SingleEntry(partition.getSlotRanges(), this, config);
entry.set(e);
}
Future<Void> f = entry.get().setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.setFailure(future.cause());
return; return;
} }
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, entry.get()); final RedisConnection connection = future.getNow();
lastPartitions.put(slotRange, partition); if (connection == null) {
Collection<Future<Void>> f = Collections.<Future<Void>>emptyList();
result.setSuccess(f);
return;
} }
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); Future<Map<String, String>> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO);
clusterFuture.addListener(new FutureListener<Map<String, String>>() {
@Override
public void operationComplete(Future<Map<String, String>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_INFO with " + connection.getRedisClient().getAddr(), future.cause());
result.setFailure(future.cause());
return;
}
Map<String, String> params = future.getNow();
if ("fail".equals(params.get("cluster_state"))) {
RedisException e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + " for slot ranges: " +
partition.getSlotRanges() + ". Reason - cluster_state:fail");
result.setFailure(e);
return;
}
MasterSlaveServersConfig config = create(cfg);
config.setMasterAddress(partition.getMasterAddress());
final MasterSlaveEntry e;
List<Future<Void>> futures = new ArrayList<Future<Void>>();
if (isReadFromSlaves) {
config.setSlaveAddresses(partition.getSlaveAddresses());
e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
List<Future<Void>> fs = e.initSlaveBalancer(config);
futures.addAll(fs);
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
} else {
e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
}
Future<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
return;
}
for (ClusterSlotRange slotRange : partition.getSlotRanges()) {
addEntry(slotRange, e);
lastPartitions.put(slotRange, partition);
}
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
}
});
futures.add(f);
result.setSuccess(futures);
}
});
} }
}); });
futures.add(f);
return futures; return result;
} }
private void monitorClusterChange(final ClusterServersConfig cfg) { private void monitorClusterChange(final ClusterServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() { monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
@Override @Override
public void run() { public void run() {
try { List<URI> nodes = new ArrayList<URI>();
for (ClusterPartition partition : lastPartitions.values()) { List<URI> slaves = new ArrayList<URI>();
for (URI uri : partition.getAllAddresses()) { AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
RedisConnection connection = connect(cfg, uri, false); for (ClusterPartition partition : lastPartitions.values()) {
if (connection == null) { nodes.add(partition.getMasterAddress());
continue; slaves.addAll(partition.getSlaveAddresses());
}
updateClusterState(cfg, connection);
return;
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} }
nodes.addAll(slaves);
checkClusterState(cfg, nodes.iterator(), lastException);
} }
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); }, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
} }
private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection) { private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) {
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); if (!iterator.hasNext()) {
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); log.error("Can't update cluster state", lastException.get());
return;
}
URI uri = iterator.next();
Future<RedisConnection> connectionFuture = connect(cfg, uri);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
lastException.set(future.cause());
checkClusterState(cfg, iterator, lastException);
return;
}
RedisConnection connection = future.getNow();
updateClusterState(cfg, connection);
}
});
}
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection) {
Future<String> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
return;
}
String nodesValue = future.getNow();
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue); Collection<ClusterPartition> newPartitions = parsePartitions(nodesValue);
checkMasterNodesChange(newPartitions); checkMasterNodesChange(newPartitions);
checkSlaveNodesChange(newPartitions); checkSlaveNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions); checkSlotsChange(cfg, newPartitions);
}
});
} }
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) { private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
@ -326,7 +409,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (!addedSlots.isEmpty()) { if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots); log.info("{} slots found to add", addedSlots);
} }
for (ClusterSlotRange slot : addedSlots) { for (final ClusterSlotRange slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot); ClusterPartition partition = find(newPartitions, slot);
boolean masterFound = false; boolean masterFound = false;
for (MasterSlaveEntry entry : getEntries().values()) { for (MasterSlaveEntry entry : getEntries().values()) {
@ -338,7 +421,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
} }
if (!masterFound) { if (!masterFound) {
addMasterEntry(partition, cfg, false); Future<Collection<Future<Void>>> future = addMasterEntry(partition, cfg);
future.addListener(new FutureListener<Collection<Future<Void>>>() {
@Override
public void operationComplete(Future<Collection<Future<Void>>> future) throws Exception {
if (!future.isSuccess()) {
log.error("New cluster slot range " + slot + " without master node detected", future.cause());
}
}
});
} }
} }
} }

Loading…
Cancel
Save