Sentinel offline slaves handling during Redisson start. #391

pull/395/head
Nikita 9 years ago
parent 1723e68bed
commit 897f4cc524

@ -199,21 +199,27 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry;
if (config.getReadMode() == ReadMode.MASTER) {
SingleEntry entry = new SingleEntry(slots, this, config);
entry = new SingleEntry(slots, this, config);
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.syncUninterruptibly();
addEntry(singleSlotRange, entry);
} else {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<Future<Void>> fs = entry.initSlaveBalancer();
for (Future<Void> future : fs) {
future.syncUninterruptibly();
}
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.syncUninterruptibly();
addEntry(singleSlotRange, entry);
entry = createMasterSlaveEntry(config, slots);
}
addEntry(singleSlotRange, entry);
}
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<Future<Void>> fs = entry.initSlaveBalancer(java.util.Collections.emptyList());
for (Future<Void> future : fs) {
future.syncUninterruptibly();
}
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.syncUninterruptibly();
return entry;
}
protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {

@ -68,14 +68,16 @@ public class MasterSlaveEntry {
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
}
public List<Future<Void>> initSlaveBalancer() {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && config.getReadMode() == ReadMode.SLAVE;
public List<Future<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty()
&& config.getReadMode() == ReadMode.SLAVE
&& disconnectedNodes.size() < config.getSlaveAddresses().size();
List<Future<Void>> result = new LinkedList<Future<Void>>();
Future<Void> f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
result.add(f);
for (URI address : config.getSlaveAddresses()) {
f = addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE);
f = addSlave(address.getHost(), address.getPort(), disconnectedNodes.contains(address), NodeType.SLAVE);
result.add(f);
}
return result;

@ -18,8 +18,10 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@ -36,6 +38,7 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.misc.URIBuilder;
@ -55,13 +58,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config);
final MasterSlaveServersConfig c = create(cfg);
List<String> disconnectedSlaves = new ArrayList<String>();
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout());
try {
@ -95,10 +98,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.addSlaveAddress(host);
slaves.put(host, true);
log.info("slave: {} added, params: {}", host, map);
log.debug("slave {} state: {}", host, map);
if (flags.contains("s_down") || flags.contains("disconnected")) {
disconnectedSlaves.add(host);
URI url = URIBuilder.create(host);
disconnectedSlaves.add(url);
log.info("slave: {} down", host);
} else {
log.info("slave: {} added", host);
}
}
break;
@ -114,11 +121,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
init(c);
for (String host : disconnectedSlaves) {
String[] parts = host.split(":");
slaveDown(parts[0], parts[1]);
}
List<Future<RedisPubSubConnection>> connectionFutures = new ArrayList<Future<RedisPubSubConnection>>(cfg.getSentinelAddresses().size());
for (URI addr : cfg.getSentinelAddresses()) {
Future<RedisPubSubConnection> future = registerSentinel(cfg, addr, c);
@ -130,6 +132,19 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
@Override
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<Future<Void>> fs = entry.initSlaveBalancer(disconnectedSlaves);
for (Future<Void> future : fs) {
future.syncUninterruptibly();
}
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
f.syncUninterruptibly();
return entry;
}
private Future<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);

Loading…
Cancel
Save