|
|
@ -55,7 +55,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
|
|
private final Set<RedisURI> disconnectedSlaves = new HashSet<>();
|
|
|
|
private final Set<RedisURI> disconnectedSlaves = new HashSet<>();
|
|
|
|
private ScheduledFuture<?> monitorFuture;
|
|
|
|
private ScheduledFuture<?> monitorFuture;
|
|
|
|
private AddressResolver<InetSocketAddress> sentinelResolver;
|
|
|
|
private final AddressResolver<InetSocketAddress> sentinelResolver;
|
|
|
|
|
|
|
|
private final Set<RedisURI> disconnectedSentinels = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
|
|
|
|
|
|
|
|
|
|
|
private final RedisStrictCommand<RedisURI> masterHostCommand;
|
|
|
|
private final RedisStrictCommand<RedisURI> masterHostCommand;
|
|
|
|
|
|
|
|
|
|
|
@ -296,7 +297,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
|
|
future.getNow().stream()
|
|
|
|
future.getNow().stream()
|
|
|
|
.map(addr -> toURI(addr))
|
|
|
|
.map(addr -> toURI(addr))
|
|
|
|
.filter(uri -> !sentinels.containsKey(uri))
|
|
|
|
.filter(uri -> !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri))
|
|
|
|
.forEach(uri -> registerSentinel(uri, getConfig(), host.getHost()));
|
|
|
|
.forEach(uri -> registerSentinel(uri, getConfig(), host.getHost()));
|
|
|
|
});
|
|
|
|
});
|
|
|
|
if (commonListener != null) {
|
|
|
|
if (commonListener != null) {
|
|
|
@ -327,6 +328,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
if (lastException.get() != null) {
|
|
|
|
if (lastException.get() != null) {
|
|
|
|
log.error("Can't update cluster state", lastException.get());
|
|
|
|
log.error("Can't update cluster state", lastException.get());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
disconnectedSentinels.clear();
|
|
|
|
performSentinelDNSCheck(null);
|
|
|
|
performSentinelDNSCheck(null);
|
|
|
|
scheduleChangeCheck(cfg, null);
|
|
|
|
scheduleChangeCheck(cfg, null);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -516,7 +518,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
private void updateSentinels(Collection<RedisURI> newUris) {
|
|
|
|
private void updateSentinels(Collection<RedisURI> newUris) {
|
|
|
|
newUris.stream()
|
|
|
|
newUris.stream()
|
|
|
|
.filter(uri -> !sentinels.containsKey(uri))
|
|
|
|
.filter(uri -> !sentinels.containsKey(uri))
|
|
|
|
.forEach(uri -> registerSentinel(uri, getConfig(), null));
|
|
|
|
.forEach(uri -> {
|
|
|
|
|
|
|
|
disconnectedSentinels.remove(uri);
|
|
|
|
|
|
|
|
registerSentinel(uri, getConfig(), null);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
sentinels.keySet().stream()
|
|
|
|
sentinels.keySet().stream()
|
|
|
|
.filter(uri -> !newUris.contains(uri))
|
|
|
|
.filter(uri -> !newUris.contains(uri))
|
|
|
@ -525,6 +530,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
if (sentinel != null) {
|
|
|
|
if (sentinel != null) {
|
|
|
|
disconnectNode(uri);
|
|
|
|
disconnectNode(uri);
|
|
|
|
sentinel.shutdownAsync();
|
|
|
|
sentinel.shutdownAsync();
|
|
|
|
|
|
|
|
disconnectedSentinels.add(uri);
|
|
|
|
log.warn("sentinel: {} is down", uri);
|
|
|
|
log.warn("sentinel: {} is down", uri);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|