Slave up/down detection via sentinel.

pull/38/head
Nikita 11 years ago
parent c9ed11750c
commit 7d9eb607cf

@ -59,6 +59,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
continue;
}
connectionEntry.setFreezed(false);
return;
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}

@ -15,14 +15,13 @@
*/
package org.redisson.connection;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
@ -79,56 +78,99 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
init(c);
monitorMasterChange(cfg);
}
private void monitorMasterChange(final SentinelServersConfig cfg) {
final AtomicReference<String> master = new AtomicReference<String>();
final Set<String> freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (final URI addr : cfg.getSentinelAddresses()) {
RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort());
sentinels.add(client);
RedisPubSubConnection<String, String> pubsub = client.connectPubSub();
// Future<RedisPubSubConnection<String, String>> pubsubFuture = client.connectAsyncPubSub();
// pubsubFuture.addListener(new FutureListener<RedisPubSubConnection<String, String>>() {
// @Override
// public void operationComplete(Future<RedisPubSubConnection<String, String>> future)
// throws Exception {
// if (!future.isSuccess()) {
// log.error("Can't connect to Sentinel {}:{}", addr.getHost(), addr.getPort());
// return;
// }
// RedisPubSubConnection<String, String> pubsub = future.get();
pubsub.addListener(new RedisPubSubAdapter<String, String>() {
@Override
public void subscribed(String channel, long count) {
log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
@Override
public void message(String channel, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
if (cfg.getMasterName().equals(parts[0])) {
String ip = parts[3];
String port = parts[4];
String current = master.get();
String newMaster = ip + ":" + port;
if (!newMaster.equals(current)
&& master.compareAndSet(current, newMaster)) {
log.debug("changing master to {}:{}", ip, port);
changeMaster(ip, Integer.valueOf(port));
}
}
} else {
log.error("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
});
pubsub.subscribe("+switch-master");
// }
// });
pubsub.addListener(new RedisPubSubAdapter<String>() {
@Override
public void subscribed(String channel, long count) {
log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
@Override
public void message(String channel, String msg) {
if ("+sdown".equals(channel)) {
onSlaveDown(freezeSlaves, addr, msg);
}
if ("-sdown".equals(channel)) {
onSlaveUp(freezeSlaves, addr, msg);
}
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, master, addr, msg);
}
}
});
pubsub.subscribe("+switch-master", "+sdown", "-sdown");
}
}
private void onSlaveDown(final Set<String> freezeSlaves, final URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
&& "slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
if (freezeSlaves.add(slaveAddr)) {
log.debug("Slave has down - {}", slaveAddr);
slaveDown(ip, Integer.valueOf(port));
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
protected void onSlaveUp(Set<String> freezeSlaves, URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
&& "slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
if (freezeSlaves.remove(slaveAddr)) {
log.debug("Slave has up - {}", slaveAddr);
slaveUp(ip, Integer.valueOf(port));
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
private void onMasterChange(final SentinelServersConfig cfg,
final AtomicReference<String> master, final URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
if (cfg.getMasterName().equals(parts[0])) {
String ip = parts[3];
String port = parts[4];
String current = master.get();
String newMaster = ip + ":" + port;
if (!newMaster.equals(current)
&& master.compareAndSet(current, newMaster)) {
log.debug("changing master from {} to {}", current, newMaster);
changeMaster(ip, Integer.valueOf(port));
}
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
@Override
public void shutdown() {
for (RedisClient sentinel : sentinels) {

Loading…
Cancel
Save