Skip disconnected sentinels during startup. Slave discovery which has been down since start fixed. #41

pull/226/head
Nikita 10 years ago
parent 96c9e5c118
commit 265cc94f50

@ -16,6 +16,7 @@
package org.redisson.connection; package org.redisson.connection;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -27,6 +28,7 @@ import org.redisson.SentinelServersConfig;
import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -47,7 +49,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
public SentinelConnectionManager(final SentinelServersConfig cfg, Config config) { public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
init(config); init(config);
final MasterSlaveServersConfig c = new MasterSlaveServersConfig(); final MasterSlaveServersConfig c = new MasterSlaveServersConfig();
@ -63,6 +65,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
List<String> disconnectedSlaves = new ArrayList<String>();
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout()); RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout());
try { try {
@ -74,7 +77,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setMasterAddress(masterHost); c.setMasterAddress(masterHost);
currentMaster.set(masterHost); currentMaster.set(masterHost);
log.info("master: {} added", masterHost); log.info("master: {} added", masterHost);
// c.addSlaveAddress(masterHost);
// TODO async // TODO async
List<Map<String, String>> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); List<Map<String, String>> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
@ -83,24 +85,34 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = map.get("port"); String port = map.get("port");
String flags = map.get("flags"); String flags = map.get("flags");
if (flags.contains("s_down") || flags.contains("disconnected")) {
log.info("slave: {}:{} is disconnected. skipped, params: {}", ip, port, map);
continue;
}
log.info("slave: {}:{} added, params: {}", ip, port, map);
c.addSlaveAddress(ip + ":" + port);
String host = ip + ":" + port; String host = ip + ":" + port;
c.addSlaveAddress(host);
slaves.put(host, true); slaves.put(host, true);
log.info("slave: {} added, params: {}", host, map);
if (flags.contains("s_down") || flags.contains("disconnected")) {
disconnectedSlaves.add(host);
}
} }
break; break;
} catch (RedisConnectionException e) {
// skip
} finally { } finally {
client.shutdownAsync(); client.shutdownAsync();
} }
} }
if (currentMaster.get() == null) {
throw new IllegalStateException("Can't connect to servers!");
}
init(c); init(c);
for (String host : disconnectedSlaves) {
String[] parts = host.split(":");
slaveDown(parts[0], parts[1]);
}
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
registerSentinel(cfg, addr); registerSentinel(cfg, addr);
} }
@ -113,39 +125,43 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return; return;
} }
RedisPubSubConnection pubsub = client.connectPubSub(); try {
pubsub.addListener(new BaseRedisPubSubListener<String>() { RedisPubSubConnection pubsub = client.connectPubSub();
pubsub.addListener(new BaseRedisPubSubListener<String>() {
@Override @Override
public void onMessage(String channel, String msg) { public void onMessage(String channel, String msg) {
if ("+sentinel".equals(channel)) { if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, msg); onSentinelAdded(cfg, msg);
} }
if ("+slave".equals(channel)) { if ("+slave".equals(channel)) {
onSlaveAdded(addr, msg); onSlaveAdded(addr, msg);
} }
if ("+sdown".equals(channel)) { if ("+sdown".equals(channel)) {
onSlaveDown(addr, msg); onSlaveDown(addr, msg);
} }
if ("-sdown".equals(channel)) { if ("-sdown".equals(channel)) {
onSlaveUp(addr, msg); onSlaveUp(addr, msg);
} }
if ("+switch-master".equals(channel)) { if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, msg); onMasterChange(cfg, addr, msg);
}
} }
}
@Override @Override
public boolean onStatus(PubSubType type, String channel) { public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.SUBSCRIBE) { if (type == PubSubType.SUBSCRIBE) {
log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
return true;
} }
return true; });
}
});
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort());
} catch (RedisConnectionException e) {
log.warn("can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort());
}
} }
protected void onSentinelAdded(SentinelServersConfig cfg, String msg) { protected void onSentinelAdded(SentinelServersConfig cfg, String msg) {
@ -188,13 +204,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];
String addr = ip + ":" + port; slaveDown(ip, port);
// to avoid freeze twice
if (freezeSlaves.putIfAbsent(addr, true) == null) {
slaveDown(0, ip, Integer.valueOf(port));
log.info("slave: {} has down", addr);
}
} else if ("sentinel".equals(parts[0])) { } else if ("sentinel".equals(parts[0])) {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];
@ -215,6 +225,15 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
} }
private void slaveDown(String ip, String port) {
// to avoid freeze twice
String addr = ip + ":" + port;
if (freezeSlaves.putIfAbsent(addr, true) == null) {
slaveDown(0, ip, Integer.valueOf(port));
log.info("slave: {} has down", addr);
}
}
protected void onSlaveUp(URI addr, String msg) { protected void onSlaveUp(URI addr, String msg) {
String[] parts = msg.split(" "); String[] parts = msg.split(" ");

Loading…
Cancel
Save