|
|
|
@ -147,12 +147,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
String ip = map.get("ip");
|
|
|
|
|
String port = map.get("port");
|
|
|
|
|
|
|
|
|
|
RedisURI sentinelAddr = convert(ip, port);
|
|
|
|
|
RedisURI sentinelAddr = toURI(ip, port);
|
|
|
|
|
RFuture<Void> future = registerSentinel(sentinelAddr, this.config);
|
|
|
|
|
connectionFutures.add(future);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisURI currentAddr = convert(client.getAddr().getAddress().getHostAddress(), "" + client.getAddr().getPort());
|
|
|
|
|
RedisURI currentAddr = toURI(client.getAddr().getAddress().getHostAddress(), "" + client.getAddr().getPort());
|
|
|
|
|
RFuture<Void> f = registerSentinel(currentAddr, this.config);
|
|
|
|
|
connectionFutures.add(f);
|
|
|
|
|
|
|
|
|
@ -263,7 +263,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Set<RedisURI> newUris = future.getNow().stream()
|
|
|
|
|
.map(addr -> convert(addr.getAddress().getHostAddress(), "" + addr.getPort()))
|
|
|
|
|
.map(addr -> toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()))
|
|
|
|
|
.collect(Collectors.toSet());
|
|
|
|
|
|
|
|
|
|
for (RedisURI uri : newUris) {
|
|
|
|
@ -449,11 +449,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}).map(m -> {
|
|
|
|
|
String ip = m.get("ip");
|
|
|
|
|
String port = m.get("port");
|
|
|
|
|
return convert(ip, port);
|
|
|
|
|
return toURI(ip, port);
|
|
|
|
|
}).collect(Collectors.toSet());
|
|
|
|
|
|
|
|
|
|
InetSocketAddress addr = connection.getRedisClient().getAddr();
|
|
|
|
|
RedisURI currentAddr = convert(addr.getAddress().getHostAddress(), "" + addr.getPort());
|
|
|
|
|
RedisURI currentAddr = toURI(addr.getAddress().getHostAddress(), "" + addr.getPort());
|
|
|
|
|
newUris.add(currentAddr);
|
|
|
|
|
|
|
|
|
|
updateSentinels(newUris);
|
|
|
|
@ -480,8 +480,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RedisURI toURI(String host, String port) {
|
|
|
|
|
RedisURI uri = new RedisURI(scheme + "://" + host + ":" + port);
|
|
|
|
|
return applyNatMap(uri);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String createAddress(String host, String port) {
|
|
|
|
|
return scheme + "://" + applyNatMap(host, Integer.valueOf(port));
|
|
|
|
|
return toURI(host, port).toString();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -538,7 +543,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
// to avoid addition twice
|
|
|
|
|
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
|
RedisURI uri = convert(ip, port);
|
|
|
|
|
RedisURI uri = toURI(ip, port);
|
|
|
|
|
if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) {
|
|
|
|
|
RFuture<Void> future = entry.addSlave(new RedisURI(addr));
|
|
|
|
|
future.onComplete((res, e) -> {
|
|
|
|
@ -563,18 +568,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RedisURI convert(String ip, String port) {
|
|
|
|
|
String addr = createAddress(ip, port);
|
|
|
|
|
RedisURI uri = new RedisURI(addr);
|
|
|
|
|
return uri;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void slaveDown(String ip, String port) {
|
|
|
|
|
if (config.checkSkipSlavesInit()) {
|
|
|
|
|
log.warn("slave: {}:{} has down", ip, port);
|
|
|
|
|
} else {
|
|
|
|
|
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
|
RedisURI uri = convert(ip, port);
|
|
|
|
|
RedisURI uri = toURI(ip, port);
|
|
|
|
|
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
|
|
|
|
|
log.warn("slave: {}:{} has down", ip, port);
|
|
|
|
|
}
|
|
|
|
@ -598,7 +597,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisURI uri = convert(ip, port);
|
|
|
|
|
RedisURI uri = toURI(ip, port);
|
|
|
|
|
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) {
|
|
|
|
|
String slaveAddr = ip + ":" + port;
|
|
|
|
|
log.info("slave: {} has up", slaveAddr);
|
|
|
|
@ -635,15 +634,17 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
super.shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String applyNatMap(String ip, int port) {
|
|
|
|
|
String mappedAddress = natMap.get(ip + ":" + port);
|
|
|
|
|
if (mappedAddress == null && natMap.get(ip) != null) {
|
|
|
|
|
mappedAddress = natMap.get(ip) + ":" + port;
|
|
|
|
|
@Override
|
|
|
|
|
public RedisURI applyNatMap(RedisURI address) {
|
|
|
|
|
String mappedAddress = natMap.get(address.getHost() + ":" + address.getPort());
|
|
|
|
|
if (mappedAddress == null && natMap.get(address.getHost()) != null) {
|
|
|
|
|
mappedAddress = natMap.get(address.getHost()) + ":" + address.getPort();
|
|
|
|
|
}
|
|
|
|
|
if (mappedAddress != null) {
|
|
|
|
|
return mappedAddress;
|
|
|
|
|
return new RedisURI(address.getScheme() + "://" + mappedAddress);
|
|
|
|
|
}
|
|
|
|
|
return ip + ":" + port;
|
|
|
|
|
return address;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|