From e9c2f57b6d146b8d53ff9511fa60e2da32e38180 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 13 Aug 2015 17:47:46 +0300 Subject: [PATCH] Sentinel single master handling. #41 --- .../redisson/connection/BaseLoadBalancer.java | 14 +++++++- .../redisson/connection/ConnectionEntry.java | 5 +++ .../org/redisson/connection/LoadBalancer.java | 2 ++ .../redisson/connection/MasterSlaveEntry.java | 15 +++++--- .../connection/SentinelConnectionManager.java | 36 ++++++++++--------- 5 files changed, 50 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 1dba95e7a..9b6f5bccb 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -50,7 +50,19 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized void add(SubscribesConnectionEntry entry) { clients.add(entry); - clientsEmpty.open(); + if (!entry.isFreezed()) { + clientsEmpty.open(); + } + } + + public int getAvailableClients() { + int count = 0; + for (SubscribesConnectionEntry connectionEntry : clients) { + if (!connectionEntry.isFreezed()) { + count++; + } + } + return count; } public synchronized void unfreeze(String host, int port) { diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index e1f358af7..5993f9782 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -78,4 +78,9 @@ public class ConnectionEntry { return conn; } + @Override + public String toString() { + return "ConnectionEntry [freezed=" + freezed + ", client=" + client + "]"; + } + } diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 87b32e579..1f0f8c044 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -23,6 +23,8 @@ import org.redisson.client.RedisPubSubConnection; public interface LoadBalancer { + int getAvailableClients(); + void shutdownAsync(); void shutdown(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index f34ca95fb..09fb493d0 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -81,12 +81,14 @@ public class MasterSlaveEntry { } public void addSlave(String host, int port) { - slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort()); +// slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort()); RedisClient client = connectionManager.createClient(host, port); - slaveBalancer.add(new SubscribesConnectionEntry(client, + SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, this.config.getSlaveConnectionPoolSize(), - this.config.getSlaveSubscriptionConnectionPoolSize())); + this.config.getSlaveSubscriptionConnectionPoolSize()); + entry.setFreezed(true); + slaveBalancer.add(entry); } public RedisClient getClient() { @@ -94,6 +96,9 @@ public class MasterSlaveEntry { } public void slaveUp(String host, int port) { + if (!masterEntry.getClient().getAddr().getHostName().equals(host) && port != masterEntry.getClient().getAddr().getPort()) { + slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort()); + } slaveBalancer.unfreeze(host, port); } @@ -106,7 +111,9 @@ public class MasterSlaveEntry { public void changeMaster(String host, int port) { ConnectionEntry oldMaster = masterEntry; setupMasterEntry(host, port); - slaveDown(host, port); + if (slaveBalancer.getAvailableClients() > 1) { + slaveDown(host, port); + } connectionManager.shutdownAsync(oldMaster.getClient()); } diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 0874b3077..1b68b7401 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.Config; import org.redisson.MasterSlaveServersConfig; import org.redisson.SentinelServersConfig; +import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; @@ -110,7 +111,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { sentinels.add(client); RedisPubSubConnection pubsub = client.connectPubSub(); - pubsub.addListener(new RedisPubSubListener() { + pubsub.addListener(new BaseRedisPubSubListener() { @Override public void onMessage(String channel, String msg) { @@ -128,10 +129,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - @Override - public void onPatternMessage(String pattern, String channel, String message) { - } - @Override public boolean onStatus(PubSubType type, String channel) { if (type == PubSubType.SUBSCRIBE) { @@ -161,27 +158,32 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { addSlave(ip, Integer.valueOf(port)); } } else { - log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } private void onSlaveDown(final Set freezeSlaves, final URI addr, String msg) { String[] parts = msg.split(" "); - if (parts.length > 4 - && "slave".equals(parts[0])) { - String ip = parts[2]; - String port = parts[3]; + if (parts.length > 3) { + if ("slave".equals(parts[0])) { + String ip = parts[2]; + String port = parts[3]; - String slaveAddr = ip + ":" + port; + String slaveAddr = ip + ":" + port; - // to avoid freeze twice - if (freezeSlaves.add(slaveAddr)) { - log.debug("Slave has down - {}", slaveAddr); - slaveDown(0, ip, Integer.valueOf(port)); + // to avoid freeze twice + if (freezeSlaves.add(slaveAddr)) { + log.debug("Slave has down - {}", slaveAddr); + slaveDown(0, ip, Integer.valueOf(port)); + } + } else if ("sentinel".equals(parts[0]) || "master".equals(parts[0])) { + // skip + } else { + log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } else { - log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } @@ -199,7 +201,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { slaveUp(ip, Integer.valueOf(port)); } } else { - log.warn("Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } }