diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 0aac16670..3a227d7c5 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -76,7 +76,7 @@ public class ClientConnectionsEntry { } public boolean isMasterForRead() { - return getFreezeReason() == FreezeReason.SYSTEM && getConfig().getReadMode() == ReadMode.MASTER_SLAVE; + return getFreezeReason() == FreezeReason.SYSTEM && getConfig().getReadMode() == ReadMode.MASTER_SLAVE && getNodeType() == NodeType.MASTER; } public void setNodeType(NodeType nodeType) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 03957704d..c9d0563d1 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -188,6 +188,10 @@ public class MasterSlaveEntry { } private boolean slaveDown(ClientConnectionsEntry entry) { + if (entry.isMasterForRead()) { + return false; + } + // add master as slave if no more slaves available if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) { @@ -201,12 +205,24 @@ public class MasterSlaveEntry { connection.closeAsync(); reattachBlockingQueue(connection); } + while (true) { + RedisConnection connection = entry.pollConnection(); + if (connection == null) { + break; + } + } entry.getAllConnections().clear(); for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { connection.closeAsync(); connectionManager.getSubscribeService().reattachPubSub(connection); } + while (true) { + RedisConnection connection = entry.pollSubscribeConnection(); + if (connection == null) { + break; + } + } entry.getAllSubscribeConnections().clear(); return true; @@ -436,6 +452,8 @@ public class MasterSlaveEntry { slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE); slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER); + // freeze in slaveBalancer + slaveDown(oldMaster.getClient().getAddr(), FreezeReason.MANAGER); // more than one slave available, so master can be removed from slaves if (!config.checkSkipSlavesInit() diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index ea56b9a48..bfc4ab999 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -164,7 +164,11 @@ public class LoadBalancerManager { synchronized (connectionEntry) { // only RECONNECT freeze reason could be replaced if (connectionEntry.getFreezeReason() == null - || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { + || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT + || (freezeReason == FreezeReason.MANAGER + && connectionEntry.getFreezeReason() != FreezeReason.MANAGER + && connectionEntry.getNodeType() == NodeType.SLAVE + )) { connectionEntry.setFreezed(true); connectionEntry.setFreezeReason(freezeReason); return connectionEntry;