diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index d0bc0cf36..7ac9a601e 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -346,8 +346,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); for (URI uri : failedSlaves) { currentPart.addFailedSlaveAddress(uri); - slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); - log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); + if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); + } } } @@ -358,8 +359,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (URI uri : removedSlaves) { currentPart.removeSlaveAddress(uri); - slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); - log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); + if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); + } } Set addedSlaves = new HashSet(newPart.getSlaveAddresses()); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 21a275ddb..4bd1a720c 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; -import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.core.NodeType; import org.redisson.misc.InfinitySemaphoreLatch; @@ -44,6 +44,8 @@ import io.netty.util.concurrent.Promise; */ public interface ConnectionManager { + void reattachPubSub(Collection allPubSubConnections); + boolean isClusterMode(); Future newSucceededFuture(R value); @@ -62,8 +64,6 @@ public interface ConnectionManager { Future newFailedFuture(Throwable cause); - void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason); - Collection getClients(); void shutdownAsync(RedisClient client); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 125df525d..2bf246e3f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -542,13 +542,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - public void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason) { - Collection allPubSubConnections = entry.slaveDown(host, port, freezeReason); - if (allPubSubConnections.isEmpty()) { - return; - } - - // reattach listeners to other channels + @Override + public void reattachPubSub(Collection allPubSubConnections) { for (Entry mapEntry : name2PubSubConnection.entrySet()) { for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) { PubSubConnectionEntry pubSubEntry = mapEntry.getValue(); @@ -620,8 +615,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { - MasterSlaveEntry entry = getEntry(slotRange); - slaveDown(entry, host, port, freezeReason); + getEntry(slotRange).slaveDown(host, port, freezeReason); } protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 7582e98fc..139030165 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -90,9 +90,11 @@ public class MasterSlaveEntry { return writeConnectionHolder.add(masterEntry); } - public Collection slaveDown(String host, int port, FreezeReason freezeReason) { - Collection conns = slaveBalancer.freeze(host, port, freezeReason); - + public boolean slaveDown(String host, int port, FreezeReason freezeReason) { + if (!slaveBalancer.freeze(host, port, freezeReason)) { + return false; + } + // add master as slave if no more slaves available if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { InetSocketAddress addr = masterEntry.getClient().getAddr(); @@ -100,7 +102,7 @@ public class MasterSlaveEntry { log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); } } - return conns; + return true; } public Future addSlave(String host, int port) { @@ -134,7 +136,7 @@ public class MasterSlaveEntry { // exclude master from slaves if (config.getReadMode() == ReadMode.SLAVE && (!addr.getHostName().equals(host) || port != addr.getPort())) { - connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); + slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); } return true; @@ -155,7 +157,7 @@ public class MasterSlaveEntry { // more than one slave available, so master can be removed from slaves if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() > 1) { - connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM); + slaveDown(host, port, FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); } diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 56a372b98..2b28c343f 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -16,7 +16,6 @@ package org.redisson.connection.balancer; import java.net.InetSocketAddress; -import java.util.Collection; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; @@ -37,7 +36,7 @@ public interface LoadBalancerManager { boolean unfreeze(String host, int port, FreezeReason freezeReason); - Collection freeze(String host, int port, FreezeReason freezeReason); + boolean freeze(String host, int port, FreezeReason freezeReason); Future add(ClientConnectionsEntry entry); diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java index 45f55ecc0..8c73c4d8d 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java @@ -16,10 +16,6 @@ package org.redisson.connection.balancer; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Map; import org.redisson.MasterSlaveServersConfig; @@ -99,16 +95,20 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { return false; } - public Collection freeze(String host, int port, FreezeReason freezeReason) { + public boolean freeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); if (connectionEntry == null) { - return Collections.emptyList(); + return false; } synchronized (connectionEntry) { - log.debug("{} freezed", addr); + if (connectionEntry.isFreezed()) { + return false; + } + connectionEntry.setFreezed(true); + // only RECONNECT freeze reason could be replaced if (connectionEntry.getFreezeReason() == null || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { @@ -134,11 +134,9 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { connection.closeAsync(); } - synchronized (connectionEntry) { - List list = new ArrayList(connectionEntry.getAllSubscribeConnections()); - connectionEntry.getAllSubscribeConnections().clear(); - return list; - } + connectionManager.reattachPubSub(connectionEntry.getAllSubscribeConnections()); + connectionEntry.getAllSubscribeConnections().clear(); + return true; } public Future nextPubSubConnection() { diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 58998a2dd..00c1c6886 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -277,7 +277,7 @@ abstract class ConnectionPool { private void checkForReconnect(ClientConnectionsEntry entry) { if (entry.getNodeType() == NodeType.SLAVE) { - connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), + masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); scheduleCheck(entry);