|
|
|
@ -19,7 +19,6 @@ import java.net.InetSocketAddress;
|
|
|
|
|
import java.net.URI;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.List;
|
|
|
|
@ -39,7 +38,6 @@ import org.redisson.client.RedisConnection;
|
|
|
|
|
import org.redisson.client.RedisConnectionException;
|
|
|
|
|
import org.redisson.client.codec.StringCodec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.cluster.ClusterSlotRange;
|
|
|
|
|
import org.redisson.config.BaseMasterSlaveServersConfig;
|
|
|
|
|
import org.redisson.config.Config;
|
|
|
|
|
import org.redisson.config.MasterSlaveServersConfig;
|
|
|
|
@ -69,7 +67,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
|
|
|
|
|
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
|
|
|
|
|
private final Set<String> slaves = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
|
|
|
|
|
|
|
|
|
|
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
|
|
|
|
|
private ScheduledFuture<?> monitorFuture;
|
|
|
|
@ -100,7 +97,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
this.config.setMasterAddress(masterHost);
|
|
|
|
|
currentMaster.set(masterHost);
|
|
|
|
|
log.info("master: {} added", masterHost);
|
|
|
|
|
slaves.add(masterHost);
|
|
|
|
|
|
|
|
|
|
List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
|
|
|
|
|
for (Map<String, String> map : sentinelSlaves) {
|
|
|
|
@ -115,7 +111,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
String host = createAddress(ip, port);
|
|
|
|
|
|
|
|
|
|
this.config.addSlaveAddress(host);
|
|
|
|
|
slaves.add(host);
|
|
|
|
|
log.debug("slave {} state: {}", host, map);
|
|
|
|
|
log.info("slave: {} added", host);
|
|
|
|
|
|
|
|
|
@ -368,9 +363,19 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
CountableListener<Void> listener = new CountableListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
protected void onSuccess(Void value) {
|
|
|
|
|
Set<String> removedSlaves = new HashSet<String>(slaves);
|
|
|
|
|
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
|
Set<String> removedSlaves = new HashSet<String>();
|
|
|
|
|
for (ClientConnectionsEntry e : entry.getAllEntries()) {
|
|
|
|
|
InetSocketAddress addr = e.getClient().getAddr();
|
|
|
|
|
String slaveAddr = createAddress(addr.getAddress().getHostAddress(), addr.getPort());
|
|
|
|
|
removedSlaves.add(slaveAddr);
|
|
|
|
|
}
|
|
|
|
|
removedSlaves.removeAll(currentSlaves);
|
|
|
|
|
|
|
|
|
|
for (String slave : removedSlaves) {
|
|
|
|
|
if (slave.equals(currentMaster.get())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
String hostPort = slave.replace("redis://", "");
|
|
|
|
|
int lastColonIdx = hostPort.lastIndexOf(":");
|
|
|
|
|
String host = hostPort.substring(0, lastColonIdx);
|
|
|
|
@ -462,13 +467,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
// to avoid addition twice
|
|
|
|
|
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
|
|
|
|
|
final URI uri = convert(ip, port);
|
|
|
|
|
if (slaves.add(slaveAddr) && !config.checkSkipSlavesInit()) {
|
|
|
|
|
if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) {
|
|
|
|
|
RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr));
|
|
|
|
|
future.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
slaves.remove(slaveAddr);
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
log.error("Can't add slave: " + slaveAddr, future.cause());
|
|
|
|
|
return;
|
|
|
|
|