sentinel up/down discovery. #221

pull/226/head
Nikita 10 years ago
parent 920c71d1f9
commit 96c9e5c118

@ -16,12 +16,9 @@
package org.redisson.connection;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
@ -31,19 +28,24 @@ import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.core.Node;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final List<RedisClient> sentinels = new ArrayList<RedisClient>();
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> freezeSlaves = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
public SentinelConnectionManager(final SentinelServersConfig cfg, Config config) {
init(config);
@ -61,7 +63,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
final Set<String> addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout());
try {
@ -71,12 +72,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
String masterHost = master.get(0) + ":" + master.get(1);
c.setMasterAddress(masterHost);
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
// c.addSlaveAddress(masterHost);
// TODO async
List<Map<String, String>> slaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
for (Map<String, String> map : slaves) {
List<Map<String, String>> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
for (Map<String, String> map : sentinelSlaves) {
String ip = map.get("ip");
String port = map.get("port");
String flags = map.get("flags");
@ -89,7 +91,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.info("slave: {}:{} added, params: {}", ip, port, map);
c.addSlaveAddress(ip + ":" + port);
String host = ip + ":" + port;
addedSlaves.add(host);
slaves.put(host, true);
}
break;
} finally {
@ -99,50 +101,66 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
init(c);
monitorMasterChange(cfg, addedSlaves);
for (URI addr : cfg.getSentinelAddresses()) {
registerSentinel(cfg, addr);
}
}
private void monitorMasterChange(final SentinelServersConfig cfg, final Set<String> addedSlaves) {
final AtomicReference<String> master = new AtomicReference<String>();
final Set<String> freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (final URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort());
sentinels.add(client);
private void registerSentinel(final SentinelServersConfig cfg, final URI addr) {
RedisClient client = createClient(addr.getHost(), addr.getPort());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
if (oldClient != null) {
return;
}
RedisPubSubConnection pubsub = client.connectPubSub();
pubsub.addListener(new BaseRedisPubSubListener<String>() {
RedisPubSubConnection pubsub = client.connectPubSub();
pubsub.addListener(new BaseRedisPubSubListener<String>() {
@Override
public void onMessage(String channel, String msg) {
if ("+slave".equals(channel)) {
onSlaveAdded(addedSlaves, addr, msg);
}
if ("+sdown".equals(channel)) {
onSlaveDown(freezeSlaves, addr, msg);
}
if ("-sdown".equals(channel)) {
onSlaveUp(freezeSlaves, addr, msg);
}
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, master, addr, msg);
}
@Override
public void onMessage(String channel, String msg) {
if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, msg);
}
if ("+slave".equals(channel)) {
onSlaveAdded(addr, msg);
}
if ("+sdown".equals(channel)) {
onSlaveDown(addr, msg);
}
if ("-sdown".equals(channel)) {
onSlaveUp(addr, msg);
}
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, msg);
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.SUBSCRIBE) {
log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
return true;
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.SUBSCRIBE) {
log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
});
return true;
}
});
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave");
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort());
}
protected void onSentinelAdded(SentinelServersConfig cfg, String msg) {
String[] parts = msg.split(" ");
if ("sentinel".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String addr = ip + ":" + port;
URI uri = URIBuilder.create(addr);
registerSentinel(cfg, uri);
}
}
protected void onSlaveAdded(Set<String> addedSlaves, URI addr, String msg) {
protected void onSlaveAdded(URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
@ -153,16 +171,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String slaveAddr = ip + ":" + port;
// to avoid addition twice
if (addedSlaves.add(slaveAddr)) {
log.debug("Slave has been added - {}", slaveAddr);
if (slaves.putIfAbsent(slaveAddr, true) == null) {
addSlave(ip, Integer.valueOf(port));
log.info("slave: {} added", slaveAddr);
}
} else {
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
private void onSlaveDown(final Set<String> freezeSlaves, final URI addr, String msg) {
private void onSlaveDown(URI sentinelAddr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -170,24 +188,34 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
String addr = ip + ":" + port;
// to avoid freeze twice
if (freezeSlaves.add(slaveAddr)) {
log.debug("Slave has down - {}", slaveAddr);
if (freezeSlaves.putIfAbsent(addr, true) == null) {
slaveDown(0, ip, Integer.valueOf(port));
log.info("slave: {} has down", addr);
}
} else if ("sentinel".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String addr = ip + ":" + port;
RedisClient sentinel = sentinels.remove(addr);
if (sentinel != null) {
sentinel.shutdownAsync();
log.info("sentinel: {} has down", addr);
}
} else if ("sentinel".equals(parts[0]) || "master".equals(parts[0])) {
} else if ("master".equals(parts[0])) {
// skip
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
}
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
}
}
protected void onSlaveUp(Set<String> freezeSlaves, URI addr, String msg) {
protected void onSlaveUp(URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
@ -196,17 +224,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = parts[3];
String slaveAddr = ip + ":" + port;
if (freezeSlaves.remove(slaveAddr)) {
log.debug("Slave has up - {}", slaveAddr);
if (freezeSlaves.remove(slaveAddr) != null) {
slaveUp(ip, Integer.valueOf(port));
log.info("slave: {} has up", slaveAddr);
}
} else {
log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
private void onMasterChange(final SentinelServersConfig cfg,
final AtomicReference<String> master, final URI addr, String msg) {
private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -214,12 +241,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[3];
String port = parts[4];
String current = master.get();
String current = currentMaster.get();
String newMaster = ip + ":" + port;
if (!newMaster.equals(current)
&& master.compareAndSet(current, newMaster)) {
log.debug("changing master from {} to {}", current, newMaster);
&& currentMaster.compareAndSet(current, newMaster)) {
changeMaster(0, ip, Integer.valueOf(port));
log.info("master has changed from {} to {}", current, newMaster);
}
}
} else {
@ -239,7 +266,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
public void shutdown() {
super.shutdown();
for (RedisClient sentinel : sentinels) {
for (RedisClient sentinel : sentinels.values()) {
sentinel.shutdown();
}
}

Loading…
Cancel
Save