|
|
|
@ -24,6 +24,7 @@ import java.util.Iterator;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.UUID;
|
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
@ -32,14 +33,11 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
|
|
|
|
import org.redisson.api.NodeType;
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.client.BaseRedisPubSubListener;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisConnection;
|
|
|
|
|
import org.redisson.client.RedisConnectionException;
|
|
|
|
|
import org.redisson.client.RedisPubSubConnection;
|
|
|
|
|
import org.redisson.client.codec.StringCodec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.client.protocol.pubsub.PubSubType;
|
|
|
|
|
import org.redisson.cluster.ClusterSlotRange;
|
|
|
|
|
import org.redisson.config.BaseMasterSlaveServersConfig;
|
|
|
|
|
import org.redisson.config.Config;
|
|
|
|
@ -72,18 +70,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
private final Set<String> slaves = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
|
|
|
|
|
|
|
|
|
|
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
|
|
|
|
|
private String masterName;
|
|
|
|
|
private ScheduledFuture<?> monitorFuture;
|
|
|
|
|
private AddressResolver<InetSocketAddress> sentinelResolver;
|
|
|
|
|
|
|
|
|
|
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
|
|
|
|
|
super(config);
|
|
|
|
|
public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) {
|
|
|
|
|
super(config, id);
|
|
|
|
|
|
|
|
|
|
if (cfg.getMasterName() == null) {
|
|
|
|
|
throw new IllegalArgumentException("masterName parameter is not defined!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.masterName = cfg.getMasterName();
|
|
|
|
|
this.config = create(cfg);
|
|
|
|
|
initTimer(this.config);
|
|
|
|
|
|
|
|
|
@ -175,7 +171,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
scheduleSentinelDNSCheck();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void scheduleSentinelDNSCheck() {
|
|
|
|
|
private void scheduleSentinelDNSCheck() {
|
|
|
|
|
monitorFuture = group.schedule(new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
@ -250,7 +246,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void checkState(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator, final AtomicReference<Throwable> lastException) {
|
|
|
|
|
private void checkState(final SentinelServersConfig cfg, final Iterator<RedisClient> iterator, final AtomicReference<Throwable> lastException) {
|
|
|
|
|
if (!iterator.hasNext()) {
|
|
|
|
|
log.error("Can't update cluster state", lastException.get());
|
|
|
|
|
scheduleChangeCheck(cfg, null);
|
|
|
|
@ -279,7 +275,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void updateState(final SentinelServersConfig cfg, final RedisConnection connection, final Iterator<RedisClient> iterator) {
|
|
|
|
|
private void updateState(final SentinelServersConfig cfg, final RedisConnection connection, final Iterator<RedisClient> iterator) {
|
|
|
|
|
final AtomicInteger commands = new AtomicInteger(2);
|
|
|
|
|
FutureListener<Object> commonListener = new FutureListener<Object>() {
|
|
|
|
|
|
|
|
|
@ -350,7 +346,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
slaveDown(ip, port);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (!isUseSameMaster(ip, port, masterHost, masterPort)) {
|
|
|
|
|
if (masterHost.equals("?") || !isUseSameMaster(ip, port, masterHost, masterPort)) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -366,7 +362,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
Set<String> removedSlaves = new HashSet<String>(slaves);
|
|
|
|
|
removedSlaves.removeAll(currentSlaves);
|
|
|
|
|
for (String slave : removedSlaves) {
|
|
|
|
|
slaves.remove(slave);
|
|
|
|
|
String[] parts = slave.replace("redis://", "").split(":");
|
|
|
|
|
slaveDown(parts[0], parts[1]);
|
|
|
|
|
}
|
|
|
|
@ -434,92 +429,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts(), null);
|
|
|
|
|
RedisClient oldClient = sentinels.putIfAbsent(key, client);
|
|
|
|
|
if (oldClient != null) {
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<RedisPubSubConnection> pubsubFuture = client.connectPubSubAsync();
|
|
|
|
|
pubsubFuture.addListener(new FutureListener<RedisPubSubConnection>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
log.warn("Can't connect to sentinel: {}", addr);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisPubSubConnection pubsub = future.getNow();
|
|
|
|
|
pubsub.addListener(new BaseRedisPubSubListener() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onMessage(String channel, Object msg) {
|
|
|
|
|
log.debug("message {} from {}", msg, channel);
|
|
|
|
|
|
|
|
|
|
if ("+sentinel".equals(channel)) {
|
|
|
|
|
onSentinelAdded((String) msg, c);
|
|
|
|
|
}
|
|
|
|
|
if ("+slave".equals(channel)) {
|
|
|
|
|
onSlaveAdded(addr, (String) msg);
|
|
|
|
|
}
|
|
|
|
|
if ("+sdown".equals(channel)) {
|
|
|
|
|
onNodeDown(addr, (String) msg);
|
|
|
|
|
}
|
|
|
|
|
if ("-sdown".equals(channel)) {
|
|
|
|
|
onNodeUp(addr, (String) msg);
|
|
|
|
|
}
|
|
|
|
|
if ("+switch-master".equals(channel)) {
|
|
|
|
|
onMasterChange(addr, (String) msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean onStatus(PubSubType type, String channel) {
|
|
|
|
|
if (type == PubSubType.SUBSCRIBE) {
|
|
|
|
|
log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel");
|
|
|
|
|
log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
sentinels.putIfAbsent(key, client);
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void onSentinelAdded(String msg, MasterSlaveServersConfig c) {
|
|
|
|
|
String[] parts = msg.split(" ");
|
|
|
|
|
if ("sentinel".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
URI uri = convert(ip, port);
|
|
|
|
|
registerSentinel(uri, c);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void onSlaveAdded(URI addr, String msg) {
|
|
|
|
|
String[] parts = msg.split(" ");
|
|
|
|
|
|
|
|
|
|
if (parts.length > 4
|
|
|
|
|
&& "slave".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
if (!isUseSameMaster(parts)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String slaveAddr = createAddress(ip, port);
|
|
|
|
|
addSlave(ip, port, slaveAddr);
|
|
|
|
|
} else {
|
|
|
|
|
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Void> addSlave(final String ip, final String port, final String slaveAddr) {
|
|
|
|
|
private RFuture<Void> addSlave(final String ip, final String port, final String slaveAddr) {
|
|
|
|
|
final RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
// to avoid addition twice
|
|
|
|
|
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
@ -553,49 +467,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected URI convert(String ip, String port) {
|
|
|
|
|
private URI convert(String ip, String port) {
|
|
|
|
|
String addr = createAddress(ip, port);
|
|
|
|
|
URI uri = URIBuilder.create(addr);
|
|
|
|
|
return uri;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void onNodeDown(URI sentinelAddr, String msg) {
|
|
|
|
|
String[] parts = msg.split(" ");
|
|
|
|
|
|
|
|
|
|
if (parts.length > 3) {
|
|
|
|
|
if ("slave".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
slaveDown(ip, port);
|
|
|
|
|
} else if ("sentinel".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
String addr = ip + ":" + port;
|
|
|
|
|
RedisClient sentinel = sentinels.remove(addr);
|
|
|
|
|
if (sentinel != null) {
|
|
|
|
|
sentinel.shutdownAsync();
|
|
|
|
|
log.warn("sentinel: {} has down", addr);
|
|
|
|
|
}
|
|
|
|
|
} else if ("master".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
// should be resolved by master switch event
|
|
|
|
|
//
|
|
|
|
|
// MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
|
// if (entry.getFreezeReason() != FreezeReason.MANAGER) {
|
|
|
|
|
// entry.freeze();
|
|
|
|
|
// String addr = ip + ":" + port;
|
|
|
|
|
// log.warn("master: {} has down", addr);
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void slaveDown(String ip, String port) {
|
|
|
|
|
if (config.checkSkipSlavesInit()) {
|
|
|
|
|
log.warn("slave: {}:{} has down", ip, port);
|
|
|
|
@ -608,11 +485,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean isUseSameMaster(String[] parts) {
|
|
|
|
|
return isUseSameMaster(parts[2], parts[3], parts[6], parts[7]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) {
|
|
|
|
|
private boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) {
|
|
|
|
|
String master = currentMaster.get();
|
|
|
|
|
String slaveMaster = createAddress(slaveMasterHost, slaveMasterPort);
|
|
|
|
|
if (!master.equals(slaveMaster)) {
|
|
|
|
@ -622,37 +495,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void onNodeUp(URI addr, String msg) {
|
|
|
|
|
String[] parts = msg.split(" ");
|
|
|
|
|
|
|
|
|
|
if (parts.length > 3) {
|
|
|
|
|
if ("slave".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
if (!isUseSameMaster(parts)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
slaveUp(ip, port);
|
|
|
|
|
} else if ("master".equals(parts[0])) {
|
|
|
|
|
String ip = parts[2];
|
|
|
|
|
String port = parts[3];
|
|
|
|
|
|
|
|
|
|
URI uri = convert(ip, port);
|
|
|
|
|
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
|
if (entry.isFreezed()
|
|
|
|
|
&& URIBuilder.compare(entry.getClient().getAddr(), uri)) {
|
|
|
|
|
entry.unfreeze();
|
|
|
|
|
String masterAddr = ip + ":" + port;
|
|
|
|
|
log.info("master: {} has up", masterAddr);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void slaveUp(String ip, String port) {
|
|
|
|
|
if (config.checkSkipSlavesInit()) {
|
|
|
|
|
String slaveAddr = ip + ":" + port;
|
|
|
|
@ -667,26 +509,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void onMasterChange(URI addr, String msg) {
|
|
|
|
|
String[] parts = msg.split(" ");
|
|
|
|
|
|
|
|
|
|
if (parts.length > 3) {
|
|
|
|
|
if (masterName.equals(parts[0])) {
|
|
|
|
|
String ip = parts[3];
|
|
|
|
|
String port = parts[4];
|
|
|
|
|
|
|
|
|
|
String current = currentMaster.get();
|
|
|
|
|
String newMaster = createAddress(ip, port);
|
|
|
|
|
if (!newMaster.equals(current)
|
|
|
|
|
&& currentMaster.compareAndSet(current, newMaster)) {
|
|
|
|
|
changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {
|
|
|
|
|
MasterSlaveServersConfig res = super.create(cfg);
|
|
|
|
|