|
|
@ -48,7 +48,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
|
|
private final Logger log = LoggerFactory.getLogger(getClass());
|
|
|
|
private final Logger log = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<RedisURI> currentMaster = new AtomicReference<>();
|
|
|
|
private final AtomicReference<InetSocketAddress> currentMaster = new AtomicReference<>();
|
|
|
|
|
|
|
|
|
|
|
|
private ScheduledFuture<?> monitorFuture;
|
|
|
|
private ScheduledFuture<?> monitorFuture;
|
|
|
|
|
|
|
|
|
|
|
@ -74,7 +74,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
|
|
Role role = Role.valueOf(connection.sync(RedisCommands.INFO_REPLICATION).get(ROLE_KEY));
|
|
|
|
Role role = Role.valueOf(connection.sync(RedisCommands.INFO_REPLICATION).get(ROLE_KEY));
|
|
|
|
if (Role.master.equals(role)) {
|
|
|
|
if (Role.master.equals(role)) {
|
|
|
|
currentMaster.set(addr);
|
|
|
|
currentMaster.set(connection.getRedisClient().getAddr());
|
|
|
|
log.info("{} is the master", addr);
|
|
|
|
log.info("{} is the master", addr);
|
|
|
|
this.config.setMasterAddress(addr.toString());
|
|
|
|
this.config.setMasterAddress(addr.toString());
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
@ -115,13 +115,13 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
RedisURI master = currentMaster.get();
|
|
|
|
InetSocketAddress master = currentMaster.get();
|
|
|
|
log.debug("Current master: {}", master);
|
|
|
|
log.debug("Current master: {}", master);
|
|
|
|
|
|
|
|
|
|
|
|
AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size());
|
|
|
|
AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size());
|
|
|
|
for (String address : cfg.getNodeAddresses()) {
|
|
|
|
for (String address : cfg.getNodeAddresses()) {
|
|
|
|
RedisURI addr = new RedisURI(address);
|
|
|
|
RedisURI uri = new RedisURI(address);
|
|
|
|
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
|
|
|
|
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
|
|
|
|
connectionFuture.onComplete((connection, exc) -> {
|
|
|
|
connectionFuture.onComplete((connection, exc) -> {
|
|
|
|
if (exc != null) {
|
|
|
|
if (exc != null) {
|
|
|
|
log.error(exc.getMessage(), exc);
|
|
|
|
log.error(exc.getMessage(), exc);
|
|
|
@ -146,12 +146,13 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
InetSocketAddress addr = connection.getRedisClient().getAddr();
|
|
|
|
Role role = Role.valueOf(r.get(ROLE_KEY));
|
|
|
|
Role role = Role.valueOf(r.get(ROLE_KEY));
|
|
|
|
if (Role.master.equals(role)) {
|
|
|
|
if (Role.master.equals(role)) {
|
|
|
|
if (master.equals(addr)) {
|
|
|
|
if (master.equals(addr)) {
|
|
|
|
log.debug("Current master {} unchanged", master);
|
|
|
|
log.debug("Current master {} unchanged", master);
|
|
|
|
} else if (currentMaster.compareAndSet(master, addr)) {
|
|
|
|
} else if (currentMaster.compareAndSet(master, addr)) {
|
|
|
|
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), addr);
|
|
|
|
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri);
|
|
|
|
changeFuture.onComplete((res, e) -> {
|
|
|
|
changeFuture.onComplete((res, e) -> {
|
|
|
|
if (e != null) {
|
|
|
|
if (e != null) {
|
|
|
|
currentMaster.compareAndSet(addr, master);
|
|
|
|
currentMaster.compareAndSet(addr, master);
|
|
|
@ -159,7 +160,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (!config.checkSkipSlavesInit()) {
|
|
|
|
} else if (!config.checkSkipSlavesInit()) {
|
|
|
|
slaveUp(addr, connection.getRedisClient().getAddr());
|
|
|
|
slaveUp(uri, addr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (count.decrementAndGet() == 0) {
|
|
|
|
if (count.decrementAndGet() == 0) {
|
|
|
|