From e3da65ad87bd554a5d0adf4a8c6a23610253915b Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 2 Jul 2014 18:49:53 +0400 Subject: [PATCH] new-slave automatic discovery --- .../redisson/connection/BaseLoadBalancer.java | 62 +++++++++---------- .../org/redisson/connection/LoadBalancer.java | 8 +-- .../MasterSlaveConnectionManager.java | 34 +++++----- .../connection/SentinelConnectionManager.java | 34 ++++++++-- 4 files changed, 82 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 882851b39..8fed00f28 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -17,7 +17,8 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Iterator; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -47,64 +48,56 @@ abstract class BaseLoadBalancer implements LoadBalancer { this.password = password; } - public void add(ConnectionEntry entry) { + public synchronized void add(ConnectionEntry entry) { clients.add(entry); clientsEmpty.open(); } - public void unfreeze(String host, int port) { + public synchronized void unfreeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); for (ConnectionEntry connectionEntry : clients) { if (!connectionEntry.getClient().getAddr().equals(addr)) { continue; } connectionEntry.setFreezed(false); + clientsEmpty.open(); return; } throw new IllegalStateException("Can't find " + addr + " in slaves!"); } - public Queue freeze(String host, int port) { + public synchronized Collection freeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); for (ConnectionEntry connectionEntry : clients) { - if (!connectionEntry.getClient().getAddr().equals(addr)) { + if (connectionEntry.isFreezed() + || !connectionEntry.getClient().getAddr().equals(addr)) { continue; } + log.debug("{} freezed", addr); connectionEntry.setFreezed(true); - return connectionEntry.getSubscribeConnections(); - } - throw new IllegalStateException("Can't find " + addr + " in slaves!"); - } - - public Queue remove(String host, int port) { - InetSocketAddress addr = new InetSocketAddress(host, port); - for (Iterator iterator = clients.iterator(); iterator.hasNext();) { - ConnectionEntry entry = iterator.next(); - if (!entry.getClient().getAddr().equals(addr)) { - continue; + // TODO shutdown watchdog + + boolean allFreezed = true; + for (ConnectionEntry entry : clients) { + if (!entry.isFreezed()) { + allFreezed = false; + break; + } } - - iterator.remove(); - log.info("slave {} removed", entry.getClient().getAddr()); - if (clients.isEmpty()) { + if (allFreezed) { clientsEmpty.close(); } - entry.shutdown(); - log.info("slave {} shutdown", entry.getClient().getAddr()); - - return entry.getSubscribeConnections(); + return connectionEntry.getSubscribeConnections(); } - throw new IllegalStateException("Can't find " + addr + " in slaves!"); + + return Collections.emptyList(); } @SuppressWarnings("unchecked") public RedisPubSubConnection nextPubSubConnection() { + clientsEmpty.awaitUninterruptibly(); List clientsCopy = new ArrayList(clients); - if (clientsCopy.isEmpty()) { - clientsEmpty.awaitUninterruptibly(); - return nextPubSubConnection(); - } while (true) { if (clientsCopy.isEmpty()) { // TODO refactor @@ -142,11 +135,8 @@ abstract class BaseLoadBalancer implements LoadBalancer { } public RedisConnection nextConnection() { + clientsEmpty.awaitUninterruptibly(); List clientsCopy = new ArrayList(clients); - if (clientsCopy.isEmpty()) { - clientsEmpty.awaitUninterruptibly(); - return nextConnection(); - } while (true) { if (clientsCopy.isEmpty()) { // TODO refactor @@ -205,4 +195,10 @@ abstract class BaseLoadBalancer implements LoadBalancer { } } + public void shutdown() { + for (ConnectionEntry entry : clients) { + entry.getClient().shutdown(); + } + } + } diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 829c6cfc7..2f4da9ada 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -15,7 +15,7 @@ */ package org.redisson.connection; -import java.util.Queue; +import java.util.Collection; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.codec.RedisCodec; @@ -23,16 +23,16 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public interface LoadBalancer { + void shutdown(); + void unfreeze(String host, int port); - Queue freeze(String host, int port); + Collection freeze(String host, int port); void init(RedisCodec codec, String password); void add(ConnectionEntry entry); - Queue remove(String host, int port); - RedisConnection nextConnection(); RedisPubSubConnection nextPubSubConnection(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 4d8e77613..7116127e0 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -20,10 +20,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.FutureListener; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; @@ -63,7 +61,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap(); protected LoadBalancer balancer; - private final List slaveClients = new ArrayList(); protected volatile RedisClient masterClient; private Semaphore masterConnectionsSemaphore; @@ -89,10 +86,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { balancer.init(codec, config.getPassword()); for (URI address : this.config.getSlaveAddresses()) { RedisClient client = new RedisClient(group, address.getHost(), address.getPort()); - slaveClients.add(client); - balancer.add(new ConnectionEntry(client, + ConnectionEntry entry = new ConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), - this.config.getSlaveSubscriptionConnectionPoolSize())); + this.config.getSlaveSubscriptionConnectionPoolSize()); + balancer.add(entry); + } + if (this.config.getSlaveAddresses().size() > 1) { + slaveDown(this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort()); } masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort()); @@ -105,16 +105,25 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected void slaveDown(String host, int port) { - Queue connections = balancer.freeze(host, port); + Collection connections = balancer.freeze(host, port); reattachListeners(connections); } + + protected void addSlave(String host, int port) { + slaveDown(masterClient.getAddr().getHostName(), port); + + RedisClient client = new RedisClient(group, host, port); + balancer.add(new ConnectionEntry(client, + this.config.getSlaveConnectionPoolSize(), + this.config.getSlaveSubscriptionConnectionPoolSize())); + } protected void slaveUp(String host, int port) { balancer.unfreeze(host, port); } /** - * Remove slave with host:port from slaves list. + * Freeze slave with host:port from slaves list. * Re-attach pub/sub listeners from it to other slave. * Shutdown old master client. * @@ -122,12 +131,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void changeMaster(String host, int port) { RedisClient oldMaster = masterClient; masterClient = new RedisClient(group, host, port); - Queue connections = balancer.remove(host, port); - reattachListeners(connections); + slaveDown(host, port); oldMaster.shutdown(); } - private void reattachListeners(Queue connections) { + private void reattachListeners(Collection connections) { for (Entry mapEntry : name2PubSubConnection.entrySet()) { for (RedisPubSubConnection redisPubSubConnection : connections) { PubSubConnectionEntry entry = mapEntry.getValue(); @@ -310,9 +318,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { masterClient.shutdown(); - for (RedisClient client : slaveClients) { - client.shutdown(); - } + balancer.shutdown(); group.shutdownGracefully().syncUninterruptibly(); } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index bf18deaa7..aa28a79e6 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -58,6 +58,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String masterHost = master.get(0) + ":" + master.get(1); c.setMasterAddress(masterHost); log.info("master: {}", masterHost); + c.addSlaveAddress(masterHost); // TODO async List> slaves = connection.slaves(cfg.getMasterName()).awaitUninterruptibly().getNow(); @@ -67,10 +68,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.info("slave: {}:{}", ip, port); c.addSlaveAddress(ip + ":" + port); } - if (slaves.isEmpty()) { - log.info("master added as slave"); - c.addSlaveAddress(masterHost); - } client.shutdown(); break; @@ -84,6 +81,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private void monitorMasterChange(final SentinelServersConfig cfg) { final AtomicReference master = new AtomicReference(); final Set freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); + final Set addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); + for (final URI addr : cfg.getSentinelAddresses()) { RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort()); sentinels.add(client); @@ -97,6 +96,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override public void message(String channel, String msg) { + if ("+slave".equals(channel)) { + onSlaveAdded(addedSlaves, addr, msg); + } if ("+sdown".equals(channel)) { onSlaveDown(freezeSlaves, addr, msg); } @@ -109,7 +111,27 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } }); - pubsub.subscribe("+switch-master", "+sdown", "-sdown"); + pubsub.subscribe("+switch-master", "+sdown", "-sdown", "+slave"); + } + } + + protected void onSlaveAdded(Set addedSlaves, URI addr, String msg) { + String[] parts = msg.split(" "); + + if (parts.length > 4 + && "slave".equals(parts[0])) { + String ip = parts[2]; + String port = parts[3]; + + String slaveAddr = ip + ":" + port; + + // to avoid addition twice + if (addedSlaves.add(slaveAddr)) { + log.debug("Slave has been added - {}", slaveAddr); + addSlave(ip, Integer.valueOf(port)); + } + } else { + log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } @@ -122,6 +144,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = parts[3]; String slaveAddr = ip + ":" + port; + + // to avoid freeze twice if (freezeSlaves.add(slaveAddr)) { log.debug("Slave has down - {}", slaveAddr); slaveDown(ip, Integer.valueOf(port));