DNS monitoring for Sentinel nodes. #1299

pull/1336/head
Nikita 7 years ago
parent d095634e6a
commit a99682f240

@ -366,17 +366,21 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
addEntry(slot, entry);
}
if (config.getDnsMonitoringInterval() != -1) {
dnsMonitor = new DNSMonitor(this, f.getNow(),
config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup);
dnsMonitor.start();
}
startDNSMonitoring(f.getNow());
} catch (RuntimeException e) {
stopThreads();
throw e;
}
}
protected void startDNSMonitoring(RedisClient masterHost) {
if (config.getDnsMonitoringInterval() != -1) {
dnsMonitor = new DNSMonitor(this, masterHost,
config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup);
dnsMonitor.start();
}
}
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);

@ -15,6 +15,7 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@ -51,6 +52,7 @@ import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.resolver.AddressResolver;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
@ -70,7 +72,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Set<String> slaves = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
private String masterName;
private ScheduledFuture<?> monitorFuture;
private AddressResolver<InetSocketAddress> sentinelResolver;
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config);
@ -79,9 +83,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
throw new IllegalArgumentException("masterName parameter is not defined!");
}
this.masterName = cfg.getMasterName();
this.config = create(cfg);
initTimer(this.config);
this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
try {
@ -133,7 +140,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String host = createAddress(ip, port);
URI sentinelAddr = URIBuilder.create(host);
RFuture<Void> future = registerSentinel(cfg, sentinelAddr, this.config);
RFuture<Void> future = registerSentinel(sentinelAddr, this.config);
connectionFutures.add(future);
}
@ -159,6 +166,76 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
scheduleChangeCheck(cfg, null);
}
@Override
protected void startDNSMonitoring(RedisClient masterHost) {
if (config.getDnsMonitoringInterval() == -1) {
return;
}
scheduleSentinelDNSCheck();
}
protected void scheduleSentinelDNSCheck() {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
List<RedisClient> sentinels = new ArrayList<RedisClient>(SentinelConnectionManager.this.sentinels.values());
final AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size());
FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
if (sentinelsCounter.decrementAndGet() == 0) {
scheduleSentinelDNSCheck();
}
}
};
for (final RedisClient client : sentinels) {
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort()));
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Unable to resolve " + client.getAddr().getHostName(), future.cause());
return;
}
boolean clientFound = false;
for (InetSocketAddress addr : future.getNow()) {
boolean found = false;
for (RedisClient currentSentinel : SentinelConnectionManager.this.sentinels.values()) {
if (currentSentinel.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress())
&& currentSentinel.getAddr().getPort() == addr.getPort()) {
found = true;
break;
}
}
if (!found) {
URI uri = convert(addr.getAddress().getHostAddress(), "" + addr.getPort());
registerSentinel(uri, getConfig());
}
if (client.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress())
&& client.getAddr().getPort() == addr.getPort()) {
clientFound = true;
}
}
if (!clientFound) {
String addr = client.getAddr().getAddress().getHostAddress() + ":" + client.getAddr().getPort();
RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(addr);
if (sentinel != null) {
sentinel.shutdownAsync();
log.warn("sentinel: {} has down", addr);
}
}
}
});
allNodes.addListener(commonListener);
}
}
}, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator) {
monitorFuture = group.schedule(new Runnable() {
@Override
@ -323,9 +400,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map.get("ip");
String port = map.get("port");
String host = createAddress(ip, port);
URI sentinelAddr = URIBuilder.create(host);
registerSentinel(cfg, sentinelAddr, getConfig());
URI sentinelAddr = convert(ip, port);
registerSentinel(sentinelAddr, getConfig());
}
}
});
@ -350,7 +426,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return entry;
}
private RFuture<Void> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
private RFuture<Void> registerSentinel(final URI addr, final MasterSlaveServersConfig c) {
String key = addr.getHost() + ":" + addr.getPort();
RedisClient client = sentinels.get(key);
if (client != null) {
@ -380,7 +456,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.debug("message {} from {}", msg, channel);
if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, (String) msg, c);
onSentinelAdded((String) msg, c);
}
if ("+slave".equals(channel)) {
onSlaveAdded(addr, (String) msg);
@ -392,7 +468,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
onNodeUp(addr, (String) msg);
}
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, (String) msg);
onMasterChange(addr, (String) msg);
}
}
@ -413,14 +489,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return RedissonPromise.newSucceededFuture(null);
}
protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) {
protected void onSentinelAdded(String msg, MasterSlaveServersConfig c) {
String[] parts = msg.split(" ");
if ("sentinel".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
URI uri = convert(ip, port);
registerSentinel(cfg, uri, c);
registerSentinel(uri, c);
}
}
@ -589,11 +665,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
private void onMasterChange(URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
if (cfg.getMasterName().equals(parts[0])) {
if (masterName.equals(parts[0])) {
String ip = parts[3];
String port = parts[4];

@ -222,10 +222,6 @@ public class LoadBalancerManager {
return client2Entry.get(redisClient);
}
protected String convert(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, URI addr) {
ClientConnectionsEntry entry = getEntry(addr);
if (entry != null) {

Loading…
Cancel
Save