From 7d9eb607cff7547b57f8854e753b9bf5cce2ccd7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 2 Jul 2014 17:01:12 +0400 Subject: [PATCH] Slave up/down detection via sentinel. --- .../redisson/connection/BaseLoadBalancer.java | 1 + .../connection/SentinelConnectionManager.java | 132 ++++++++++++------ 2 files changed, 88 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 524cb710e..882851b39 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -59,6 +59,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { continue; } connectionEntry.setFreezed(false); + return; } throw new IllegalStateException("Can't find " + addr + " in slaves!"); } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 9ddffecf0..bf18deaa7 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -15,14 +15,13 @@ */ package org.redisson.connection; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import org.redisson.Config; @@ -79,56 +78,99 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { init(c); + monitorMasterChange(cfg); + } + + private void monitorMasterChange(final SentinelServersConfig cfg) { final AtomicReference master = new AtomicReference(); + final Set freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); for (final URI addr : cfg.getSentinelAddresses()) { RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort()); sentinels.add(client); RedisPubSubConnection pubsub = client.connectPubSub(); -// Future> pubsubFuture = client.connectAsyncPubSub(); -// pubsubFuture.addListener(new FutureListener>() { -// @Override -// public void operationComplete(Future> future) -// throws Exception { -// if (!future.isSuccess()) { -// log.error("Can't connect to Sentinel {}:{}", addr.getHost(), addr.getPort()); -// return; -// } -// RedisPubSubConnection pubsub = future.get(); - pubsub.addListener(new RedisPubSubAdapter() { - @Override - public void subscribed(String channel, long count) { - log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); - } - - @Override - public void message(String channel, String msg) { - String[] parts = msg.split(" "); - - if (parts.length > 3) { - if (cfg.getMasterName().equals(parts[0])) { - String ip = parts[3]; - String port = parts[4]; - - String current = master.get(); - String newMaster = ip + ":" + port; - if (!newMaster.equals(current) - && master.compareAndSet(current, newMaster)) { - log.debug("changing master to {}:{}", ip, port); - changeMaster(ip, Integer.valueOf(port)); - } - } - } else { - log.error("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); - } - } - }); - pubsub.subscribe("+switch-master"); -// } -// }); + pubsub.addListener(new RedisPubSubAdapter() { + @Override + public void subscribed(String channel, long count) { + log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); + } + + @Override + public void message(String channel, String msg) { + if ("+sdown".equals(channel)) { + onSlaveDown(freezeSlaves, addr, msg); + } + if ("-sdown".equals(channel)) { + onSlaveUp(freezeSlaves, addr, msg); + } + if ("+switch-master".equals(channel)) { + onMasterChange(cfg, master, addr, msg); + } + } + + }); + pubsub.subscribe("+switch-master", "+sdown", "-sdown"); + } + } + + private void onSlaveDown(final Set freezeSlaves, final URI addr, String msg) { + String[] parts = msg.split(" "); + + if (parts.length > 4 + && "slave".equals(parts[0])) { + String ip = parts[2]; + String port = parts[3]; + + String slaveAddr = ip + ":" + port; + if (freezeSlaves.add(slaveAddr)) { + log.debug("Slave has down - {}", slaveAddr); + slaveDown(ip, Integer.valueOf(port)); + } + } else { + log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } + protected void onSlaveUp(Set freezeSlaves, URI addr, String msg) { + String[] parts = msg.split(" "); + + if (parts.length > 4 + && "slave".equals(parts[0])) { + String ip = parts[2]; + String port = parts[3]; + + String slaveAddr = ip + ":" + port; + if (freezeSlaves.remove(slaveAddr)) { + log.debug("Slave has up - {}", slaveAddr); + slaveUp(ip, Integer.valueOf(port)); + } + } else { + log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + } + } + + private void onMasterChange(final SentinelServersConfig cfg, + final AtomicReference master, final URI addr, String msg) { + String[] parts = msg.split(" "); + + if (parts.length > 3) { + if (cfg.getMasterName().equals(parts[0])) { + String ip = parts[3]; + String port = parts[4]; + + String current = master.get(); + String newMaster = ip + ":" + port; + if (!newMaster.equals(current) + && master.compareAndSet(current, newMaster)) { + log.debug("changing master from {} to {}", current, newMaster); + changeMaster(ip, Integer.valueOf(port)); + } + } + } else { + log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + } + } + @Override public void shutdown() { for (RedisClient sentinel : sentinels) {