|
|
|
@ -27,6 +27,7 @@ import org.redisson.MasterSlaveServersConfig;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisConnection;
|
|
|
|
|
import org.redisson.client.RedisConnectionException;
|
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
@ -34,6 +35,13 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
|
|
|
|
import io.netty.util.concurrent.ScheduledFuture;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* {@link ConnectionManager} for AWS ElastiCache Replication Groups. By providing all nodes
|
|
|
|
|
* of the replication group to this manager, the role of each node can be polled to determine
|
|
|
|
|
* if a failover has occurred resulting in a new master.
|
|
|
|
|
*
|
|
|
|
|
* @author Steve Ungerer
|
|
|
|
|
*/
|
|
|
|
|
public class ElasticacheReplicationGroupConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
private static final String ROLE_KEY = "role:";
|
|
|
|
@ -64,10 +72,14 @@ public class ElasticacheReplicationGroupConnectionManager extends MasterSlaveCon
|
|
|
|
|
|
|
|
|
|
Role role = determineRole(connection.sync(RedisCommands.INFO_REPLICATION));
|
|
|
|
|
if (Role.master.equals(role)) {
|
|
|
|
|
if (currentMaster.get() != null) {
|
|
|
|
|
throw new RedisException("Multiple masters detected");
|
|
|
|
|
}
|
|
|
|
|
currentMaster.set(addr);
|
|
|
|
|
log.info("{} is the master", addr);
|
|
|
|
|
this.config.setMasterAddress(addr);
|
|
|
|
|
} else {
|
|
|
|
|
log.info("{} is a slave", addr);
|
|
|
|
|
this.config.addSlaveAddress(addr);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -99,20 +111,20 @@ public class ElasticacheReplicationGroupConnectionManager extends MasterSlaveCon
|
|
|
|
|
public void run() {
|
|
|
|
|
try {
|
|
|
|
|
URI master = currentMaster.get();
|
|
|
|
|
log.debug("Current master: {}", master);
|
|
|
|
|
for (URI addr : cfg.getNodeAddresses()) {
|
|
|
|
|
RedisConnection connection = connect(cfg, addr);
|
|
|
|
|
String replInfo = connection.sync(RedisCommands.INFO_REPLICATION);
|
|
|
|
|
log.trace("{} repl info: {}", addr, replInfo);
|
|
|
|
|
|
|
|
|
|
Role role = determineRole(replInfo);
|
|
|
|
|
log.debug("Current master: {} / node {} is {}", master, addr, role);
|
|
|
|
|
log.debug("node {} is {}", addr, role);
|
|
|
|
|
|
|
|
|
|
if (Role.master.equals(role) && master.equals(addr)) {
|
|
|
|
|
log.debug("Current master {} unchanged", master);
|
|
|
|
|
} else if (Role.master.equals(role) && !master.equals(addr) && currentMaster.compareAndSet(master, addr)) {
|
|
|
|
|
log.info("Master has changed from {} to {}", master, addr);
|
|
|
|
|
changeMaster(MAX_SLOT, addr.getHost(), addr.getPort());
|
|
|
|
|
slaveDown(MAX_SLOT, master.getHost(), master.getPort());
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -125,13 +137,12 @@ public class ElasticacheReplicationGroupConnectionManager extends MasterSlaveCon
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Role determineRole(String data) {
|
|
|
|
|
String[] lines = data.split("\\r\\n");
|
|
|
|
|
for (String s : lines) {
|
|
|
|
|
for (String s : data.split("\\r\\n")) {
|
|
|
|
|
if (s.startsWith(ROLE_KEY)) {
|
|
|
|
|
return Role.valueOf(s.substring(ROLE_KEY.length()));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
throw new RuntimeException("Cannot determine role from " + data);
|
|
|
|
|
throw new RedisException("Cannot determine node role from provided 'INFO replication' data");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private MasterSlaveServersConfig create(ElasticacheReplicationGroupServersConfig cfg) {
|
|
|
|
|