From 186357ac6c2da1a5a12c0287a08408ac5ec6683b Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Jun 2015 20:24:28 +0300 Subject: [PATCH] slave status handling in Sentinel mode. #175 --- .../lambdaworks/redis/protocol/CommandHandler.java | 10 +++++----- .../org/redisson/connection/ConnectionManager.java | 2 ++ .../connection/MasterSlaveConnectionManager.java | 6 +++++- .../connection/SentinelConnectionManager.java | 13 ++++++++++--- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index be7fff5d5..077051b69 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -46,8 +46,8 @@ public class CommandHandler extends ChannelDuplexHandler { try { if (!input.isReadable()) return; - System.out.println("in: " + input.toString(CharsetUtil.UTF_8)); - +// System.out.println("in: " + input.toString(CharsetUtil.UTF_8)); + buffer.discardReadBytes(); buffer.writeBytes(input); @@ -62,8 +62,8 @@ public class CommandHandler extends ChannelDuplexHandler { Command cmd = (Command) msg; ByteBuf buf = ctx.alloc().heapBuffer(); cmd.encode(buf); - System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); - +// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); + ctx.write(buf, promise); } @@ -74,7 +74,7 @@ public class CommandHandler extends ChannelDuplexHandler { || !rsm.decode(buffer, cmd.getOutput())) { break; } - + cmd = queue.take(); cmd.complete(); } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index cf3f99de4..5e89d2ebb 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -34,6 +34,8 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; //TODO ping support public interface ConnectionManager { + RedisClient createClient(String host, int port, int timeout); + RedisClient createClient(String host, int port); V get(Future future); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9e2845370..aa5e88713 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -115,7 +115,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } public RedisClient createClient(String host, int port) { - return new RedisClient(group, socketChannelClass, host, port, config.getTimeout()); + return createClient(host, port, config.getTimeout()); + } + + public RedisClient createClient(String host, int port, int timeout) { + return new RedisClient(group, socketChannelClass, host, port, timeout); } public FutureListener createReleaseWriteListener(final int slot, diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 7b41bb716..3421d210c 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -58,14 +58,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { final Set addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); for (URI addr : cfg.getSentinelAddresses()) { - RedisClient client = createClient(addr.getHost(), addr.getPort()); + RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout()); RedisAsyncConnection connection = client.connectAsync(); // TODO async List master = get(connection.getMasterAddrByKey(cfg.getMasterName())); String masterHost = master.get(0) + ":" + master.get(1); c.setMasterAddress(masterHost); - log.info("master: {}", masterHost); + log.info("master: {} added", masterHost); // c.addSlaveAddress(masterHost); // TODO async @@ -73,7 +73,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { for (Map map : slaves) { String ip = map.get("ip"); String port = map.get("port"); - log.info("slave: {}:{}", ip, port); + String flags = map.get("flags"); + + if (flags.contains("s_down") || flags.contains("disconnected")) { + log.info("slave: {}:{} is disconnected. skipped, params: {}", ip, port, map); + continue; + } + + log.info("slave: {}:{} added, params: {}", ip, port, map); c.addSlaveAddress(ip + ":" + port); String host = ip + ":" + port; addedSlaves.add(host);