Connections restore with Sentinel servers fixed.

pull/365/head
Nikita 9 years ago
parent c20e550dae
commit 76eebdd45a

@ -545,7 +545,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
MasterSlaveEntry entry = getEntry(slotRange);
slaveDown(entry, host, port, freezeReason);
log.info("slave: {}:{} has down", host, port);
}
protected void changeMaster(ClusterSlotRange slotRange, String host, int port) {

@ -92,9 +92,10 @@ public class MasterSlaveEntry {
// add master as slave if no more slaves available
if (slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
}
}
return conns;
}
@ -120,15 +121,17 @@ public class MasterSlaveEntry {
return masterEntry.getClient();
}
public void slaveUp(String host, int port, FreezeReason freezeReason) {
public boolean slaveUp(String host, int port, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(host, port, freezeReason)) {
return;
return false;
}
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!addr.getHostName().equals(host) || port != addr.getPort()) {
connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
}
return true;
}
/**

@ -49,7 +49,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> freezeSlaves = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
@ -222,8 +221,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
// to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null) {
addSlave(ip, Integer.valueOf(port));
getEntry(singleSlotRange).addSlave(ip, Integer.valueOf(port));
log.info("slave: {} added", slaveAddr);
} else {
slaveUp(ip, port);
}
} else {
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
@ -266,11 +267,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private void slaveDown(String ip, String port) {
// to avoid freeze twice
String addr = ip + ":" + port;
if (freezeSlaves.putIfAbsent(addr, true) == null) {
slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER);
}
log.info("slave: {}:{} has down", ip, port);
}
private void onNodeUp(URI addr, String msg) {
@ -281,11 +279,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
if (freezeSlaves.remove(slaveAddr) != null) {
slaveUp(ip, Integer.valueOf(port));
log.info("slave: {} has up", slaveAddr);
}
slaveUp(ip, port);
} else if ("master".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
@ -303,6 +297,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void slaveUp(String ip, String port) {
if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr);
}
}
private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
String[] parts = msg.split(" ");
@ -324,14 +325,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void addSlave(String host, int port) {
getEntry(0).addSlave(host, port);
}
private void slaveUp(String host, int port) {
getEntry(0).slaveUp(host, port, FreezeReason.MANAGER);
}
@Override
public void shutdown() {
super.shutdown();

@ -134,10 +134,12 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
connection.closeAsync();
}
synchronized (connectionEntry) {
List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
connectionEntry.getAllSubscribeConnections().clear();
return list;
}
}
public Future<RedisPubSubConnection> nextPubSubConnection() {
return pubSubEntries.get();

Loading…
Cancel
Save