Fixed - IP address instead of hostname is used in sentinel mode with SSL connection. #5463

pull/5477/merge
Nikita Koksharov 1 year ago
parent 2892b39c57
commit ed6d21e4c2

@ -179,7 +179,7 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
SslContext sslContext = sslContextBuilder.build();
String hostname = config.getSslHostname();
if (hostname == null || NetUtil.createByteArrayFromIpAddressString(hostname) != null) {
hostname = config.getAddress().getHost();
hostname = redisClient.getAddr().getHostName();
}
SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), hostname, config.getAddress().getPort());

@ -552,6 +552,11 @@ public class MasterSlaveEntry {
return slaveBalancer.unfreezeAsync(address, freezeReason);
}
public CompletableFuture<Boolean> slaveUpNoMasterExclusionAsync(InetSocketAddress address, FreezeReason freezeReason) {
noPubSubSlaves.set(false);
return slaveBalancer.unfreezeAsync(address, freezeReason);
}
public CompletableFuture<Boolean> slaveUpAsync(InetSocketAddress address, FreezeReason freezeReason) {
noPubSubSlaves.set(false);
CompletableFuture<Boolean> f = slaveBalancer.unfreezeAsync(address, freezeReason);

@ -111,9 +111,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
RedisURI masterHost = serviceManager.resolveIP(scheme, master).join();
this.config.setMasterAddress(masterHost.toString());
currentMaster.set(masterHost);
this.config.setMasterAddress(master.toString());
InetSocketAddress masterHost = serviceManager.resolve(master).join();
currentMaster.set(toURI(masterHost));
log.info("master: {} added", masterHost);
List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
@ -127,7 +127,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
RedisURI uri = resolveIP(host, port).join();
RedisURI uri = serviceManager.toURI(scheme, host, port);
this.config.addSlaveAddress(uri.toString());
log.debug("slave {} state: {}", uri, map);
@ -149,13 +149,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map.get("ip");
String port = map.get("port");
RedisURI uri = resolveIP(ip, port).join();
CompletionStage<Void> future = registerSentinel(uri, this.config, null);
InetSocketAddress sentinelAddr = resolveIP(ip, port).join();
CompletionStage<Void> future = registerSentinel(sentinelAddr);
connectionFutures.add(future.toCompletableFuture());
}
RedisURI sentinelIp = toURI(connection.getRedisClient().getAddr());
CompletionStage<Void> f = registerSentinel(sentinelIp, this.config, null);
CompletionStage<Void> f = registerSentinel(connection.getRedisClient().getAddr());
connectionFutures.add(f.toCompletableFuture());
CompletableFuture<Void> future = CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[0]));
@ -203,6 +202,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
scheduleChangeCheck(cfg, null);
}
private static boolean isHostname(String host) {
return NetUtil.createByteArrayFromIpAddressString(host) == null;
}
private void checkAuth(SentinelServersConfig cfg) {
if (cfg.getPassword() == null) {
return;
@ -288,9 +291,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
future.getNow().stream()
.map(addr -> toURI(addr))
.filter(uri -> !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri))
.forEach(uri -> registerSentinel(uri, serviceManager.getConfig(), host.getHost()));
.filter(addr -> {
RedisURI uri = toURI(addr);
return !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri);
})
.forEach(addr -> registerSentinel(addr));
});
if (commonListener != null) {
allNodes.addListener(commonListener);
@ -331,7 +336,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
RedisClient client = iterator.next();
RedisURI addr = toURI(client.getAddr());
CompletionStage<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null);
String hostname = null;
if (isHostname(client.getAddr().getHostName())) {
hostname = client.getAddr().getHostName();
}
CompletionStage<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, hostname);
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
@ -384,7 +393,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<RedisURI>> newUris = list.stream().filter(m -> {
List<CompletableFuture<InetSocketAddress>> newUris = list.stream().filter(m -> {
String flags = m.getOrDefault("flags", "");
String masterLinkStatus = m.getOrDefault("master-link-status", "");
if (!m.isEmpty() && !isSlaveDown(flags, masterLinkStatus)) {
@ -394,9 +403,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}).map(m -> {
String ip = m.get("ip");
String port = m.get("port");
return serviceManager.toURI(scheme, ip, port);
}).map(addr -> {
CompletionStage<RedisURI> f = serviceManager.resolveIP(addr);
CompletionStage<InetSocketAddress> f = resolveIP(ip, port);
return f.exceptionally(ex -> {
log.error("unable to resolve hostname", ex);
return null;
@ -405,7 +412,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
CompletableFuture<Void> futures = CompletableFuture.allOf(newUris.toArray(new CompletableFuture[0]));
return futures.whenComplete((r, ex) -> {
List<RedisURI> uris = newUris.stream().map(u -> {
List<InetSocketAddress> uris = newUris.stream().map(u -> {
try {
return u.getNow(null);
} catch (Exception exc) {
@ -414,8 +421,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}).filter(u -> u != null).collect(Collectors.toList());
InetSocketAddress addr = connection.getRedisClient().getAddr();
RedisURI currentAddr = toURI(addr);
uris.add(currentAddr);
uris.add(addr);
updateSentinels(uris);
});
@ -439,8 +445,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
CompletableFuture<RedisURI> slaveAddrFuture = resolveIP(host, port);
CompletableFuture<RedisURI> masterAddrFuture;
CompletableFuture<InetSocketAddress> slaveAddrFuture = resolveIP(host, port);
CompletableFuture<InetSocketAddress> masterAddrFuture;
if ("?".equals(masterHost)) {
masterAddrFuture = CompletableFuture.completedFuture(null);
} else {
@ -456,8 +462,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
})
.thenCompose(res -> {
RedisURI slaveAddr = slaveAddrFuture.getNow(null);
RedisURI masterAddr = masterAddrFuture.getNow(null);
InetSocketAddress slaveAddr = slaveAddrFuture.getNow(null);
InetSocketAddress masterAddr = masterAddrFuture.getNow(null);
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
return CompletableFuture.completedFuture(res);
@ -466,7 +472,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return CompletableFuture.completedFuture(res);
}
currentSlaves.add(slaveAddr);
RedisURI uri = toURI(slaveAddr);
currentSlaves.add(uri);
return addSlave(slaveAddr).whenComplete((r, e) -> {
if (e != null) {
log.error("Unable to add slave {}", slaveAddr, e);
@ -480,8 +487,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> serviceManager.toURI(scheme, a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.filter(a -> {
RedisURI uri = toURI(a);
return !currentSlaves.contains(uri) && !uri.equals(currentMaster.get());
})
.forEach(a -> slaveDown(a));
});
});
@ -495,7 +504,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
RedisURI current = currentMaster.get();
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
CompletableFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
RedisURI host = newMaster;
if (newMaster.isSsl()) {
RedisURI h = masterFuture.toCompletableFuture().join();
if (!h.isIP()) {
host = new RedisURI(scheme, h.getHost(), h.getPort());
}
}
CompletableFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), host);
return changeFuture.exceptionally(ex -> {
currentMaster.compareAndSet(newMaster, current);
return null;
@ -505,16 +521,27 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
});
}
private void updateSentinels(Collection<RedisURI> newUris) {
newUris.stream()
.filter(uri -> !sentinels.containsKey(uri))
.forEach(uri -> {
private void updateSentinels(Collection<InetSocketAddress> newAddrs) {
newAddrs.stream()
.filter(addr -> {
RedisURI uri = toURI(addr);
return !sentinels.containsKey(uri);
})
.forEach(addr -> {
RedisURI uri = toURI(addr);
disconnectedSentinels.remove(uri);
registerSentinel(uri, serviceManager.getConfig(), null);
registerSentinel(addr);
});
sentinels.keySet().stream()
.filter(uri -> !newUris.contains(uri))
.filter(uri -> {
for (InetSocketAddress addr : newAddrs) {
if (uri.equals(addr)) {
return false;
}
}
return true;
})
.forEach(uri -> {
RedisClient sentinel = sentinels.remove(uri);
if (sentinel != null) {
@ -531,24 +558,21 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return disconnectedSlaves;
}
private CompletionStage<Void> registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) {
boolean isHostname = NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null;
if (!isHostname) {
RedisClient sentinel = sentinels.get(addr);
if (sentinel != null) {
return CompletableFuture.completedFuture(null);
}
private CompletionStage<Void> registerSentinel(InetSocketAddress addr) {
RedisURI uri = toURI(addr);
RedisClient sentinel = sentinels.get(uri);
if (sentinel != null) {
return CompletableFuture.completedFuture(null);
}
RedisClient client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getTimeout(), sslHostname);
RedisURI hostname = serviceManager.toURI(scheme, addr.getAddress().getHostName(), "" + addr.getPort());
RedisClient client = createClient(NodeType.SENTINEL, addr, hostname, null);
CompletableFuture<InetSocketAddress> future = client.resolveAddr();
return future.thenCompose(res -> {
RedisURI ipAddr = toURI(client.getAddr());
if (isHostname) {
RedisClient sentinel = sentinels.get(ipAddr);
if (sentinel != null) {
return CompletableFuture.completedFuture(null);
}
RedisURI ipAddr = toURI(res);
RedisClient s = sentinels.get(ipAddr);
if (s != null) {
return CompletableFuture.completedFuture(null);
}
CompletionStage<RedisConnection> f = client.connectAsync();
@ -559,55 +583,58 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
if (sentinels.putIfAbsent(ipAddr, client) == null) {
log.info("sentinel: {} added", ipAddr);
} else {
client.shutdownAsync();
}
return null;
});
});
}
private CompletableFuture<RedisURI> resolveIP(String host, String port) {
private CompletableFuture<InetSocketAddress> resolveIP(String host, String port) {
RedisURI uri = serviceManager.toURI(scheme, host, port);
return serviceManager.resolveIP(uri);
return serviceManager.resolve(uri);
}
private RedisURI toURI(InetSocketAddress addr) {
return serviceManager.toURI(scheme, addr.getAddress().getHostAddress(), "" + addr.getPort());
}
private CompletableFuture<Void> addSlave(RedisURI uri) {
private CompletableFuture<Void> addSlave(InetSocketAddress addr) {
if (config.isSlaveNotUsed()) {
log.info("slave: {} is up", uri);
log.info("slave: {} is up", addr);
return CompletableFuture.completedFuture(null);
}
// to avoid addition twice
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (!entry.hasSlave(uri)) {
CompletableFuture<Void> future = entry.addSlave(uri);
if (!entry.hasSlave(addr)) {
RedisURI uri = serviceManager.toURI(scheme, addr.getHostName(), "" + addr.getPort());
CompletableFuture<Void> future = entry.addSlave(addr, uri);
return future.thenApply(res -> {
log.info("slave: {} added", uri);
log.info("slave: {} added", addr);
return null;
});
}
CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER);
CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(addr, FreezeReason.MANAGER);
return f.thenCompose(e -> {
if (e) {
log.info("slave: {} is up", uri);
return entry.excludeMasterFromSlaves(uri);
log.info("slave: {} is up", addr);
return entry.excludeMasterFromSlaves(addr);
}
return CompletableFuture.completedFuture(e);
}).thenApply(e -> null);
}
private void slaveDown(RedisURI uri) {
private void slaveDown(InetSocketAddress addr) {
if (config.isSlaveNotUsed()) {
log.warn("slave: {} is down", uri);
log.warn("slave: {} is down", addr);
} else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.slaveDownAsync(uri, FreezeReason.MANAGER).thenAccept(r -> {
entry.slaveDownAsync(addr, FreezeReason.MANAGER).thenAccept(r -> {
if (r) {
log.warn("slave: {} is down", uri);
log.warn("slave: {} is down", addr);
}
});
}
@ -621,9 +648,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return baseStatus;
}
private boolean isUseSameMaster(RedisURI slaveAddr, RedisURI slaveMasterAddr) {
private boolean isUseSameMaster(InetSocketAddress slaveAddr, InetSocketAddress slaveMasterAddr) {
RedisURI master = currentMaster.get();
if (!master.equals(slaveMasterAddr) && !slaveAddr.equals(master)) {
if (!master.equals(slaveMasterAddr) && !master.equals(slaveAddr)) {
log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMasterAddr, master);
return false;
}

@ -356,6 +356,34 @@ public class ServiceManager {
return result;
}
public CompletableFuture<InetSocketAddress> resolve(RedisURI address) {
if (address.isIP()) {
try {
InetAddress ip = InetAddress.getByName(address.getHost());
InetSocketAddress addr = new InetSocketAddress(InetAddress.getByAddress(address.getHost(), ip.getAddress()), address.getPort());
return CompletableFuture.completedFuture(addr);
} catch (UnknownHostException e) {
throw new IllegalArgumentException(e);
}
}
CompletableFuture<InetSocketAddress> result = new CompletableFuture<>();
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(group.next());
InetSocketAddress addr = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
Future<InetSocketAddress> future = resolver.resolve(addr);
future.addListener((FutureListener<InetSocketAddress>) f -> {
if (!f.isSuccess()) {
log.error("Unable to resolve {}", address, f.cause());
result.completeExceptionally(f.cause());
return;
}
InetSocketAddress s = f.getNow();
result.complete(s);
});
return result;
}
public RedisURI toURI(String scheme, String host, String port) {
// convert IPv6 address to unified compressed format
if (NetUtil.isValidIpV6Address(host)) {

Loading…
Cancel
Save