|
|
|
@ -64,7 +64,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
init(this.config);
|
|
|
|
|
|
|
|
|
|
for (URI addr : cfg.getNodeAddresses()) {
|
|
|
|
|
RedisConnection connection = connect(cfg, addr);
|
|
|
|
|
RedisConnection connection = connect(cfg, addr, true);
|
|
|
|
|
if (connection == null) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -73,7 +73,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
Collection<ClusterPartition> partitions = parsePartitions(nodesValue);
|
|
|
|
|
for (ClusterPartition partition : partitions) {
|
|
|
|
|
Collection<Future<Void>> s = addMasterEntry(partition, cfg);
|
|
|
|
|
Collection<Future<Void>> s = addMasterEntry(partition, cfg, true);
|
|
|
|
|
for (Future<Void> future : s) {
|
|
|
|
|
future.syncUninterruptibly();
|
|
|
|
|
}
|
|
|
|
@ -82,10 +82,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (lastPartitions.isEmpty()) {
|
|
|
|
|
throw new RedisConnectionException("Can't connect to servers!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
monitorClusterChange(cfg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RedisConnection connect(ClusterServersConfig cfg, URI addr) {
|
|
|
|
|
private RedisConnection connect(ClusterServersConfig cfg, URI addr, boolean skipLogging) {
|
|
|
|
|
RedisConnection connection = nodeConnections.get(addr);
|
|
|
|
|
if (connection != null) {
|
|
|
|
|
return connection;
|
|
|
|
@ -95,12 +99,18 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
connection = client.connect();
|
|
|
|
|
nodeConnections.put(addr, connection);
|
|
|
|
|
} catch (RedisConnectionException e) {
|
|
|
|
|
log.warn(e.getMessage(), e);
|
|
|
|
|
if (!skipLogging) {
|
|
|
|
|
log.warn(e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error(e.getMessage(), e);
|
|
|
|
|
if (!skipLogging) {
|
|
|
|
|
log.error(e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (connection != null && !connection.isActive()) {
|
|
|
|
|
log.warn("connection for {} is not active!", connection.getRedisClient().getAddr());
|
|
|
|
|
if (!skipLogging) {
|
|
|
|
|
log.warn("connection to {} is not active!", connection.getRedisClient().getAddr());
|
|
|
|
|
}
|
|
|
|
|
connection.closeAsync();
|
|
|
|
|
connection = null;
|
|
|
|
|
}
|
|
|
|
@ -114,14 +124,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
protected void initEntry(MasterSlaveServersConfig config) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Collection<Future<Void>> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg) {
|
|
|
|
|
private Collection<Future<Void>> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg, boolean skipLogging) {
|
|
|
|
|
if (partition.isMasterFail()) {
|
|
|
|
|
log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges());
|
|
|
|
|
Future<Void> f = newSucceededFuture(null);
|
|
|
|
|
return Collections.singletonList(f);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisConnection connection = connect(cfg, partition.getMasterAddress());
|
|
|
|
|
RedisConnection connection = connect(cfg, partition.getMasterAddress(), skipLogging);
|
|
|
|
|
if (connection == null) {
|
|
|
|
|
Future<Void> f = newSucceededFuture(null);
|
|
|
|
|
return Collections.singletonList(f);
|
|
|
|
@ -163,7 +173,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
try {
|
|
|
|
|
for (ClusterPartition partition : lastPartitions.values()) {
|
|
|
|
|
for (URI uri : partition.getAllAddresses()) {
|
|
|
|
|
RedisConnection connection = connect(cfg, uri);
|
|
|
|
|
RedisConnection connection = connect(cfg, uri, false);
|
|
|
|
|
if (connection == null) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -306,7 +316,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!masterFound) {
|
|
|
|
|
addMasterEntry(partition, cfg);
|
|
|
|
|
addMasterEntry(partition, cfg, false);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|