slave status handling in Sentinel mode. #175

pull/139/merge
Nikita 10 years ago
parent 80c6e07fa6
commit 186357ac6c

@ -46,7 +46,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
try { try {
if (!input.isReadable()) return; if (!input.isReadable()) return;
System.out.println("in: " + input.toString(CharsetUtil.UTF_8)); // System.out.println("in: " + input.toString(CharsetUtil.UTF_8));
buffer.discardReadBytes(); buffer.discardReadBytes();
buffer.writeBytes(input); buffer.writeBytes(input);
@ -62,7 +62,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler {
Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg; Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
ByteBuf buf = ctx.alloc().heapBuffer(); ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf); cmd.encode(buf);
System.out.println("out: " + buf.toString(CharsetUtil.UTF_8)); // System.out.println("out: " + buf.toString(CharsetUtil.UTF_8));
ctx.write(buf, promise); ctx.write(buf, promise);
} }

@ -34,6 +34,8 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
//TODO ping support //TODO ping support
public interface ConnectionManager { public interface ConnectionManager {
RedisClient createClient(String host, int port, int timeout);
RedisClient createClient(String host, int port); RedisClient createClient(String host, int port);
<V> V get(Future<V> future); <V> V get(Future<V> future);

@ -115,7 +115,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
public RedisClient createClient(String host, int port) { public RedisClient createClient(String host, int port) {
return new RedisClient(group, socketChannelClass, host, port, config.getTimeout()); return createClient(host, port, config.getTimeout());
}
public RedisClient createClient(String host, int port, int timeout) {
return new RedisClient(group, socketChannelClass, host, port, timeout);
} }
public <T> FutureListener<T> createReleaseWriteListener(final int slot, public <T> FutureListener<T> createReleaseWriteListener(final int slot,

@ -58,14 +58,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final Set<String> addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); final Set<String> addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort()); RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout());
RedisAsyncConnection<String, String> connection = client.connectAsync(); RedisAsyncConnection<String, String> connection = client.connectAsync();
// TODO async // TODO async
List<String> master = get(connection.getMasterAddrByKey(cfg.getMasterName())); List<String> master = get(connection.getMasterAddrByKey(cfg.getMasterName()));
String masterHost = master.get(0) + ":" + master.get(1); String masterHost = master.get(0) + ":" + master.get(1);
c.setMasterAddress(masterHost); c.setMasterAddress(masterHost);
log.info("master: {}", masterHost); log.info("master: {} added", masterHost);
// c.addSlaveAddress(masterHost); // c.addSlaveAddress(masterHost);
// TODO async // TODO async
@ -73,7 +73,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
for (Map<String, String> map : slaves) { for (Map<String, String> map : slaves) {
String ip = map.get("ip"); String ip = map.get("ip");
String port = map.get("port"); String port = map.get("port");
log.info("slave: {}:{}", ip, port); String flags = map.get("flags");
if (flags.contains("s_down") || flags.contains("disconnected")) {
log.info("slave: {}:{} is disconnected. skipped, params: {}", ip, port, map);
continue;
}
log.info("slave: {}:{} added, params: {}", ip, port, map);
c.addSlaveAddress(ip + ":" + port); c.addSlaveAddress(ip + ":" + port);
String host = ip + ":" + port; String host = ip + ":" + port;
addedSlaves.add(host); addedSlaves.add(host);

Loading…
Cancel
Save