Sentinel single master handling. #41

pull/226/head
Nikita 10 years ago
parent 6b0c25471a
commit e9c2f57b6d

@ -50,7 +50,19 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized void add(SubscribesConnectionEntry entry) {
clients.add(entry);
clientsEmpty.open();
if (!entry.isFreezed()) {
clientsEmpty.open();
}
}
public int getAvailableClients() {
int count = 0;
for (SubscribesConnectionEntry connectionEntry : clients) {
if (!connectionEntry.isFreezed()) {
count++;
}
}
return count;
}
public synchronized void unfreeze(String host, int port) {

@ -78,4 +78,9 @@ public class ConnectionEntry {
return conn;
}
@Override
public String toString() {
return "ConnectionEntry [freezed=" + freezed + ", client=" + client + "]";
}
}

@ -23,6 +23,8 @@ import org.redisson.client.RedisPubSubConnection;
public interface LoadBalancer {
int getAvailableClients();
void shutdownAsync();
void shutdown();

@ -81,12 +81,14 @@ public class MasterSlaveEntry {
}
public void addSlave(String host, int port) {
slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
// slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
RedisClient client = connectionManager.createClient(host, port);
slaveBalancer.add(new SubscribesConnectionEntry(client,
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
this.config.getSlaveSubscriptionConnectionPoolSize());
entry.setFreezed(true);
slaveBalancer.add(entry);
}
public RedisClient getClient() {
@ -94,6 +96,9 @@ public class MasterSlaveEntry {
}
public void slaveUp(String host, int port) {
if (!masterEntry.getClient().getAddr().getHostName().equals(host) && port != masterEntry.getClient().getAddr().getPort()) {
slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
}
slaveBalancer.unfreeze(host, port);
}
@ -106,7 +111,9 @@ public class MasterSlaveEntry {
public void changeMaster(String host, int port) {
ConnectionEntry oldMaster = masterEntry;
setupMasterEntry(host, port);
slaveDown(host, port);
if (slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port);
}
connectionManager.shutdownAsync(oldMaster.getClient());
}

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.SentinelServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
@ -110,7 +111,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
sentinels.add(client);
RedisPubSubConnection pubsub = client.connectPubSub();
pubsub.addListener(new RedisPubSubListener<String>() {
pubsub.addListener(new BaseRedisPubSubListener<String>() {
@Override
public void onMessage(String channel, String msg) {
@ -128,10 +129,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
@Override
public void onPatternMessage(String pattern, String channel, String message) {
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.SUBSCRIBE) {
@ -161,27 +158,32 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
addSlave(ip, Integer.valueOf(port));
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
private void onSlaveDown(final Set<String> freezeSlaves, final URI addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
&& "slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
if (parts.length > 3) {
if ("slave".equals(parts[0])) {
String ip = parts[2];
String port = parts[3];
String slaveAddr = ip + ":" + port;
String slaveAddr = ip + ":" + port;
// to avoid freeze twice
if (freezeSlaves.add(slaveAddr)) {
log.debug("Slave has down - {}", slaveAddr);
slaveDown(0, ip, Integer.valueOf(port));
// to avoid freeze twice
if (freezeSlaves.add(slaveAddr)) {
log.debug("Slave has down - {}", slaveAddr);
slaveDown(0, ip, Integer.valueOf(port));
}
} else if ("sentinel".equals(parts[0]) || "master".equals(parts[0])) {
// skip
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}
@ -199,7 +201,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
slaveUp(ip, Integer.valueOf(port));
}
} else {
log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
}
}

Loading…
Cancel
Save