new-slave automatic discovery

pull/38/head
Nikita 11 years ago
parent 7d9eb607cf
commit e3da65ad87

@ -17,7 +17,8 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -47,64 +48,56 @@ abstract class BaseLoadBalancer implements LoadBalancer {
this.password = password;
}
public void add(ConnectionEntry entry) {
public synchronized void add(ConnectionEntry entry) {
clients.add(entry);
clientsEmpty.open();
}
public void unfreeze(String host, int port) {
public synchronized void unfreeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (ConnectionEntry connectionEntry : clients) {
if (!connectionEntry.getClient().getAddr().equals(addr)) {
continue;
}
connectionEntry.setFreezed(false);
clientsEmpty.open();
return;
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}
public Queue<RedisPubSubConnection> freeze(String host, int port) {
public synchronized Collection<RedisPubSubConnection> freeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (ConnectionEntry connectionEntry : clients) {
if (!connectionEntry.getClient().getAddr().equals(addr)) {
if (connectionEntry.isFreezed()
|| !connectionEntry.getClient().getAddr().equals(addr)) {
continue;
}
log.debug("{} freezed", addr);
connectionEntry.setFreezed(true);
return connectionEntry.getSubscribeConnections();
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}
public Queue<RedisPubSubConnection> remove(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (Iterator<ConnectionEntry> iterator = clients.iterator(); iterator.hasNext();) {
ConnectionEntry entry = iterator.next();
if (!entry.getClient().getAddr().equals(addr)) {
continue;
// TODO shutdown watchdog
boolean allFreezed = true;
for (ConnectionEntry entry : clients) {
if (!entry.isFreezed()) {
allFreezed = false;
break;
}
}
iterator.remove();
log.info("slave {} removed", entry.getClient().getAddr());
if (clients.isEmpty()) {
if (allFreezed) {
clientsEmpty.close();
}
entry.shutdown();
log.info("slave {} shutdown", entry.getClient().getAddr());
return entry.getSubscribeConnections();
return connectionEntry.getSubscribeConnections();
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
return Collections.emptyList();
}
@SuppressWarnings("unchecked")
public RedisPubSubConnection nextPubSubConnection() {
clientsEmpty.awaitUninterruptibly();
List<ConnectionEntry> clientsCopy = new ArrayList<ConnectionEntry>(clients);
if (clientsCopy.isEmpty()) {
clientsEmpty.awaitUninterruptibly();
return nextPubSubConnection();
}
while (true) {
if (clientsCopy.isEmpty()) {
// TODO refactor
@ -142,11 +135,8 @@ abstract class BaseLoadBalancer implements LoadBalancer {
}
public RedisConnection nextConnection() {
clientsEmpty.awaitUninterruptibly();
List<ConnectionEntry> clientsCopy = new ArrayList<ConnectionEntry>(clients);
if (clientsCopy.isEmpty()) {
clientsEmpty.awaitUninterruptibly();
return nextConnection();
}
while (true) {
if (clientsCopy.isEmpty()) {
// TODO refactor
@ -205,4 +195,10 @@ abstract class BaseLoadBalancer implements LoadBalancer {
}
}
public void shutdown() {
for (ConnectionEntry entry : clients) {
entry.getClient().shutdown();
}
}
}

@ -15,7 +15,7 @@
*/
package org.redisson.connection;
import java.util.Queue;
import java.util.Collection;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
@ -23,16 +23,16 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public interface LoadBalancer {
void shutdown();
void unfreeze(String host, int port);
Queue<RedisPubSubConnection> freeze(String host, int port);
Collection<RedisPubSubConnection> freeze(String host, int port);
void init(RedisCodec codec, String password);
void add(ConnectionEntry entry);
Queue<RedisPubSubConnection> remove(String host, int port);
RedisConnection nextConnection();
RedisPubSubConnection nextPubSubConnection();

@ -20,10 +20,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.FutureListener;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
@ -63,7 +61,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
protected LoadBalancer balancer;
private final List<RedisClient> slaveClients = new ArrayList<RedisClient>();
protected volatile RedisClient masterClient;
private Semaphore masterConnectionsSemaphore;
@ -89,10 +86,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
balancer.init(codec, config.getPassword());
for (URI address : this.config.getSlaveAddresses()) {
RedisClient client = new RedisClient(group, address.getHost(), address.getPort());
slaveClients.add(client);
balancer.add(new ConnectionEntry(client,
ConnectionEntry entry = new ConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
this.config.getSlaveSubscriptionConnectionPoolSize());
balancer.add(entry);
}
if (this.config.getSlaveAddresses().size() > 1) {
slaveDown(this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort());
}
masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort());
@ -105,16 +105,25 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
protected void slaveDown(String host, int port) {
Queue<RedisPubSubConnection> connections = balancer.freeze(host, port);
Collection<RedisPubSubConnection> connections = balancer.freeze(host, port);
reattachListeners(connections);
}
protected void addSlave(String host, int port) {
slaveDown(masterClient.getAddr().getHostName(), port);
RedisClient client = new RedisClient(group, host, port);
balancer.add(new ConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
protected void slaveUp(String host, int port) {
balancer.unfreeze(host, port);
}
/**
* Remove slave with <code>host:port</code> from slaves list.
* Freeze slave with <code>host:port</code> from slaves list.
* Re-attach pub/sub listeners from it to other slave.
* Shutdown old master client.
*
@ -122,12 +131,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void changeMaster(String host, int port) {
RedisClient oldMaster = masterClient;
masterClient = new RedisClient(group, host, port);
Queue<RedisPubSubConnection> connections = balancer.remove(host, port);
reattachListeners(connections);
slaveDown(host, port);
oldMaster.shutdown();
}
private void reattachListeners(Queue<RedisPubSubConnection> connections) {
private void reattachListeners(Collection<RedisPubSubConnection> connections) {
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : connections) {
PubSubConnectionEntry entry = mapEntry.getValue();
@ -310,9 +318,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void shutdown() {
masterClient.shutdown();
for (RedisClient client : slaveClients) {
client.shutdown();
}
balancer.shutdown();
group.shutdownGracefully().syncUninterruptibly();
}

@ -58,6 +58,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String masterHost = master.get(0) + ":" + master.get(1);
c.setMasterAddress(masterHost);
log.info("master: {}", masterHost);
c.addSlaveAddress(masterHost);
// TODO async
List<Map<String, String>> slaves = connection.slaves(cfg.getMasterName()).awaitUninterruptibly().getNow();
@ -67,10 +68,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.info("slave: {}:{}", ip, port);
c.addSlaveAddress(ip + ":" + port);
}
if (slaves.isEmpty()) {
log.info("master added as slave");
c.addSlaveAddress(masterHost);
}
client.shutdown();
break;
@ -84,6 +81,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private void monitorMasterChange(final SentinelServersConfig cfg) {
final AtomicReference<String> master = new AtomicReference<String>();
final Set<String> freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
final Set<String> addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
for (final URI addr : cfg.getSentinelAddresses()) {
RedisClient client = new RedisClient(group, addr.getHost(), addr.getPort());
sentinels.add(client);
@ -97,6 +96,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
public void message(String channel, String msg) {
if ("+slave".equals(channel)) {
onSlaveAdded(addedSlaves, addr, msg);
}
if ("+sdown".equals(channel)) {
onSlaveDown(freezeSlaves, addr, msg);
}
@ -109,7 +111,27 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
});
pubsub.subscribe("+switch-master", "+sdown", "-sdown");
pubsub.subscribe("+switch-master", "+sdown", "-sdown", "+slave");
}
}
protected void onSlaveAdded(Set<String> addedSlaves, URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
&& "slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
// to avoid addition twice
if (addedSlaves.add(slaveAddr)) {
log.debug("Slave has been added - {}", slaveAddr);
addSlave(ip, Integer.valueOf(port));
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
@ -122,6 +144,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = parts[3];
String slaveAddr = ip + ":" + port;
// to avoid freeze twice
if (freezeSlaves.add(slaveAddr)) {
log.debug("Slave has down - {}", slaveAddr);
slaveDown(ip, Integer.valueOf(port));

Loading…
Cancel
Save