From 1173c9590c95dbb7b171d1e1cd214ca4ba93e4c2 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 14 Oct 2016 18:33:28 +0300 Subject: [PATCH] PubSub connection re-subscription doesn't work in some cases of when there are only one slave available. #663 --- .../connection/ConnectionManager.java | 4 +- .../MasterSlaveConnectionManager.java | 49 ++++++-- .../redisson/connection/MasterSlaveEntry.java | 119 +++++++++++------- .../connection/PubSubConnectionEntry.java | 2 - .../connection/SentinelConnectionManager.java | 21 ++-- 5 files changed, 128 insertions(+), 67 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 4332fa3f3..9171851aa 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -109,9 +109,9 @@ public interface ConnectionManager { Codec unsubscribe(String channelName, AsyncSemaphore lock); - Codec unsubscribe(String channelName); + RFuture unsubscribe(String channelName, boolean temporaryDown); - Codec punsubscribe(String channelName); + RFuture punsubscribe(String channelName, boolean temporaryDown); Codec punsubscribe(String channelName, AsyncSemaphore lock); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index a1bda8e09..067ae6fb7 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -53,7 +53,6 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; -import org.redisson.misc.RedissonThreadFactory; import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.TransferListener; import org.slf4j.Logger; @@ -540,16 +539,32 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Codec unsubscribe(String channelName) { + public RFuture unsubscribe(final String channelName, boolean temporaryDown) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { return null; } + freePubSubConnections.remove(entry); - Codec entryCodec = entry.getConnection().getChannels().get(channelName); + final Codec entryCodec = entry.getConnection().getChannels().get(channelName); + if (temporaryDown) { + final RPromise result = newPromise(); + entry.unsubscribe(channelName, new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { + result.trySuccess(entryCodec); + return true; + } + return false; + } + + }); + return result; + } entry.unsubscribe(channelName, null); - - return entryCodec; + return newSucceededFuture(entryCodec); } public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) { @@ -583,16 +598,32 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override - public Codec punsubscribe(final String channelName) { + public RFuture punsubscribe(final String channelName, boolean temporaryDown) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { return null; } + freePubSubConnections.remove(entry); - Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName); + final Codec entryCodec = entry.getConnection().getChannels().get(channelName); + if (temporaryDown) { + final RPromise result = newPromise(); + entry.punsubscribe(channelName, new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { + result.trySuccess(entryCodec); + return true; + } + return false; + } + + }); + return result; + } entry.punsubscribe(channelName, null); - - return entryCodec; + return newSucceededFuture(entryCodec); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index bad7ef82a..e64689f9b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -108,7 +108,7 @@ public class MasterSlaveEntry { return false; } - return slaveDown(e); + return slaveDown(e, freezeReason == FreezeReason.SYSTEM); } public boolean slaveDown(String host, int port, FreezeReason freezeReason) { @@ -117,10 +117,10 @@ public class MasterSlaveEntry { return false; } - return slaveDown(entry); + return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); } - private boolean slaveDown(ClientConnectionsEntry entry) { + private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { InetSocketAddress addr = masterEntry.getClient().getAddr(); @@ -154,33 +154,45 @@ public class MasterSlaveEntry { } for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { - reattachPubSub(connection); + reattachPubSub(connection, temporaryDown); } entry.getAllSubscribeConnections().clear(); return true; } - private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { + private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) { for (String channelName : redisPubSubConnection.getChannels().keySet()) { PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); Collection> listeners = pubSubEntry.getListeners(channelName); - reattachPubSubListeners(channelName, listeners); + reattachPubSubListeners(channelName, listeners, temporaryDown); } for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); Collection> listeners = pubSubEntry.getListeners(channelName); - reattachPatternPubSubListeners(channelName, listeners); + reattachPatternPubSubListeners(channelName, listeners, temporaryDown); } } - private void reattachPubSubListeners(final String channelName, final Collection> listeners) { - Codec subscribeCodec = connectionManager.unsubscribe(channelName); + private void reattachPubSubListeners(final String channelName, final Collection> listeners, boolean temporaryDown) { + RFuture subscribeCodec = connectionManager.unsubscribe(channelName, temporaryDown); if (listeners.isEmpty()) { return; } + subscribeCodec.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + Codec subscribeCodec = future.get(); + subscribe(channelName, listeners, subscribeCodec); + } + + }); + } + + private void subscribe(final String channelName, final Collection> listeners, + final Codec subscribeCodec) { RFuture subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null); subscribeFuture.addListener(new FutureListener() { @@ -188,42 +200,54 @@ public class MasterSlaveEntry { public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - log.error("Can't resubscribe topic channel: " + channelName); + subscribe(channelName, listeners, subscribeCodec); return; } PubSubConnectionEntry newEntry = future.getNow(); for (RedisPubSubListener redisPubSubListener : listeners) { newEntry.addListener(channelName, redisPubSubListener); } - log.debug("resubscribed listeners for '{}' channel", channelName); + log.debug("resubscribed listeners of '{}' channel to {}", channelName, newEntry.getConnection().getRedisClient()); } }); } - private void reattachPatternPubSubListeners(final String channelName, - final Collection> listeners) { - Codec subscribeCodec = connectionManager.punsubscribe(channelName); - if (!listeners.isEmpty()) { - RFuture future = connectionManager.psubscribe(channelName, subscribeCodec, null); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - log.error("Can't resubscribe topic channel: " + channelName); - return; - } - - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel-pattern", channelName); - } - }); + private void reattachPatternPubSubListeners(final String channelName, final Collection> listeners, boolean temporaryDown) { + RFuture subscribeCodec = connectionManager.punsubscribe(channelName, temporaryDown); + if (listeners.isEmpty()) { + return; } + + subscribeCodec.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + Codec subscribeCodec = future.get(); + psubscribe(channelName, listeners, subscribeCodec); + } + }); } + private void psubscribe(final String channelName, final Collection> listeners, + final Codec subscribeCodec) { + RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null); + subscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + psubscribe(channelName, listeners, subscribeCodec); + return; + } + + PubSubConnectionEntry newEntry = future.getNow(); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + } + }); + } + private void reattachBlockingQueue(RedisConnection connection) { final CommandData commandData = connection.getCurrentCommand(); @@ -272,7 +296,7 @@ public class MasterSlaveEntry { public RFuture addSlave(String host, int port) { return addSlave(host, port, true, NodeType.SLAVE); } - + private RFuture addSlave(String host, int port, boolean freezed, NodeType mode) { RedisClient client = connectionManager.createClient(NodeType.SLAVE, host, port); ClientConnectionsEntry entry = new ClientConnectionsEntry(client, @@ -316,18 +340,23 @@ public class MasterSlaveEntry { * @param host of Redis * @param port of Redis */ - public void changeMaster(String host, int port) { - ClientConnectionsEntry oldMaster = masterEntry; - setupMasterEntry(host, port); - writeConnectionHolder.remove(oldMaster); - slaveDown(oldMaster, FreezeReason.MANAGER); - - // more than one slave available, so master can be removed from slaves - if (config.getReadMode() == ReadMode.SLAVE - && slaveBalancer.getAvailableClients() > 1) { - slaveDown(host, port, FreezeReason.SYSTEM); - } - connectionManager.shutdownAsync(oldMaster.getClient()); + public void changeMaster(final String host, final int port) { + final ClientConnectionsEntry oldMaster = masterEntry; + RFuture future = setupMasterEntry(host, port); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + writeConnectionHolder.remove(oldMaster); + slaveDown(oldMaster, FreezeReason.MANAGER); + + // more than one slave available, so master can be removed from slaves + if (config.getReadMode() == ReadMode.SLAVE + && slaveBalancer.getAvailableClients() > 1) { + slaveDown(host, port, FreezeReason.SYSTEM); + } + connectionManager.shutdownAsync(oldMaster.getClient()); + } + }); } public boolean isFreezed() { diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 2a6a5ce7e..88ad18529 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -34,8 +34,6 @@ import io.netty.util.concurrent.Future; public class PubSubConnectionEntry { - public enum Status {ACTIVE, INACTIVE} - private final AtomicInteger subscribedChannelsAmount; private final RedisPubSubConnection conn; diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 227ab8a82..dbce1ec03 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -218,8 +218,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { final String slaveAddr = ip + ":" + port; // to avoid addition twice - if (slaves.putIfAbsent(slaveAddr, true) == null && config.getReadMode() != ReadMode.MASTER) { - RFuture future = getEntry(singleSlotRange.getStartSlot()).addSlave(ip, Integer.valueOf(port)); + if (slaves.putIfAbsent(slaveAddr, true) == null) { + final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); + RFuture future = entry.addSlave(ip, Integer.valueOf(port)); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -229,7 +230,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} added", slaveAddr); } @@ -266,12 +267,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - if (entry.getFreezeReason() != FreezeReason.MANAGER) { - entry.freeze(); - String addr = ip + ":" + port; - log.warn("master: {} has down", addr); - } +// should be resolved by master switch event +// +// MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); +// if (entry.getFreezeReason() != FreezeReason.MANAGER) { +// entry.freeze(); +// String addr = ip + ":" + port; +// log.warn("master: {} has down", addr); +// } } } else { log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());