diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index ac410070c..54270890d 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -23,7 +23,6 @@ import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.QueueCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,26 +150,28 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { } private void refresh(RedisConnection connection, Channel channel) { - CommandData commandData = connection.getCurrentCommand(); + CommandData commandData = connection.getCurrentCommand(); connection.updateChannel(channel); reattachBlockingQueue(connection, commandData); reattachPubSub(connection); } - private void reattachBlockingQueue(RedisConnection connection, final CommandData commandData) { - if (commandData != null - && QueueCommand.TIMEOUTLESS_COMMANDS.contains(commandData.getCommand().getName())) { - ChannelFuture future = connection.send(commandData); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - log.error("Can't reconnect blocking queue to new connection {}", commandData); - } - } - }); + private void reattachBlockingQueue(RedisConnection connection, final CommandData commandData) { + if (commandData == null + || !commandData.isBlockingCommand()) { + return; } + + ChannelFuture future = connection.send(commandData); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't reconnect blocking queue to new connection. {}", commandData); + } + } + }); } } diff --git a/src/main/java/org/redisson/client/protocol/CommandData.java b/src/main/java/org/redisson/client/protocol/CommandData.java index 6cb93ad77..a47227c9e 100644 --- a/src/main/java/org/redisson/client/protocol/CommandData.java +++ b/src/main/java/org/redisson/client/protocol/CommandData.java @@ -77,5 +77,9 @@ public class CommandData implements QueueCommand { } return Collections.emptyList(); } + + public boolean isBlockingCommand() { + return QueueCommand.TIMEOUTLESS_COMMANDS.contains(command.getName()) && !promise.isDone(); + } } diff --git a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index abaf8fd4c..1d9c3d6da 100644 --- a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -158,19 +158,19 @@ public class ClientConnectionsEntry { } private void addReconnectListener(Promise connectionFuture, T conn) { - connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig()); - addFireEventListener(connectionFuture); + addFireEventListener(conn, connectionFuture); conn.setReconnectListener(new ReconnectListener() { @Override public void onReconnect(RedisConnection conn, Promise connectionFuture) { - connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig()); - addFireEventListener(connectionFuture); + addFireEventListener(conn, connectionFuture); } }); } - private void addFireEventListener(Promise connectionFuture) { + private void addFireEventListener(T conn, Promise connectionFuture) { + connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig()); + if (connectionFuture.isSuccess()) { connectionManager.getConnectionEventsHub().fireConnect(connectionFuture.getNow().getRedisClient().getAddr()); return; @@ -196,10 +196,12 @@ public class ClientConnectionsEntry { connectionFuture.tryFailure(future.cause()); return; } + RedisPubSubConnection conn = future.getNow(); log.debug("new pubsub connection created: {}", conn); addReconnectListener(connectionFuture, conn); + allSubscribeConnections.add(conn); } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 4bd1a720c..dd3ad00f2 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; @@ -44,8 +43,6 @@ import io.netty.util.concurrent.Promise; */ public interface ConnectionManager { - void reattachPubSub(Collection allPubSubConnections); - boolean isClusterMode(); Future newSucceededFuture(R value); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 2bf246e3f..63f8e175f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -542,78 +542,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - @Override - public void reattachPubSub(Collection allPubSubConnections) { - for (Entry mapEntry : name2PubSubConnection.entrySet()) { - for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) { - PubSubConnectionEntry pubSubEntry = mapEntry.getValue(); - String channelName = mapEntry.getKey(); - - if (!pubSubEntry.getConnection().equals(redisPubSubConnection)) { - continue; - } - - synchronized (pubSubEntry) { - pubSubEntry.close(); - - Collection listeners = pubSubEntry.getListeners(channelName); - if (pubSubEntry.getConnection().getPatternChannels().get(channelName) != null) { - reattachPatternPubSubListeners(channelName, listeners); - } else { - reattachPubSubListeners(channelName, listeners); - } - } - } - } - } - - private void reattachPubSubListeners(final String channelName, final Collection listeners) { - Codec subscribeCodec = unsubscribe(channelName); - if (!listeners.isEmpty()) { - Future future = subscribe(subscribeCodec, channelName, null); - future.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - log.error("Can't resubscribe topic channel: " + channelName); - return; - } - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel", channelName); - } - }); - } - } - - private void reattachPatternPubSubListeners(final String channelName, - final Collection listeners) { - Codec subscribeCodec = punsubscribe(channelName); - if (!listeners.isEmpty()) { - Future future = psubscribe(channelName, subscribeCodec); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - log.error("Can't resubscribe topic channel: " + channelName); - return; - } - - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel-pattern", channelName); - } - }); - } - } - protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { getEntry(slotRange).slaveDown(host, port, freezeReason); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 139030165..ce8bea5c6 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -28,6 +28,9 @@ import org.redisson.ReadMode; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.CommandData; import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; @@ -37,7 +40,10 @@ import org.redisson.core.NodeType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; /** * @@ -91,7 +97,8 @@ public class MasterSlaveEntry { } public boolean slaveDown(String host, int port, FreezeReason freezeReason) { - if (!slaveBalancer.freeze(host, port, freezeReason)) { + ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason); + if (entry == null) { return false; } @@ -102,8 +109,154 @@ public class MasterSlaveEntry { log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); } } + + // close all connections + while (true) { + final RedisConnection connection = entry.pollConnection(); + if (connection == null) { + break; + } + + connection.closeAsync().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + reattachBlockingQueue(connection); + } + }); + } + + // close all pub/sub connections + while (true) { + RedisPubSubConnection connection = entry.pollSubscribeConnection(); + if (connection == null) { + break; + } + connection.closeAsync(); + } + + for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { + reattachPubSub(connection); + } + entry.getAllSubscribeConnections().clear(); + return true; } + + private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) { + for (String channelName : redisPubSubConnection.getChannels().keySet()) { + PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); + + synchronized (pubSubEntry) { + pubSubEntry.close(); + + Collection listeners = pubSubEntry.getListeners(channelName); + reattachPubSubListeners(channelName, listeners); + } + } + + for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { + PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); + + synchronized (pubSubEntry) { + pubSubEntry.close(); + + Collection listeners = pubSubEntry.getListeners(channelName); + reattachPatternPubSubListeners(channelName, listeners); + } + } + } + + private void reattachPubSubListeners(final String channelName, final Collection listeners) { + Codec subscribeCodec = connectionManager.unsubscribe(channelName); + if (!listeners.isEmpty()) { + Future future = connectionManager.subscribe(subscribeCodec, channelName, null); + future.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + log.error("Can't resubscribe topic channel: " + channelName); + return; + } + PubSubConnectionEntry newEntry = future.getNow(); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel", channelName); + } + }); + } + } + + private void reattachPatternPubSubListeners(final String channelName, + final Collection listeners) { + Codec subscribeCodec = connectionManager.punsubscribe(channelName); + if (!listeners.isEmpty()) { + Future future = connectionManager.psubscribe(channelName, subscribeCodec); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + log.error("Can't resubscribe topic channel: " + channelName); + return; + } + + PubSubConnectionEntry newEntry = future.getNow(); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + } + }); + } + } + + private void reattachBlockingQueue(RedisConnection connection) { + final CommandData commandData = connection.getCurrentCommand(); + + if (commandData == null + || !commandData.isBlockingCommand()) { + return; + } + + Future newConnection = connectionReadOp(); + newConnection.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't resubscribe blocking queue {}", commandData); + return; + } + + final RedisConnection newConnection = future.getNow(); + + final FutureListener listener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + releaseRead(newConnection); + } + }; + commandData.getPromise().addListener(listener); + if (commandData.getPromise().isDone()) { + return; + } + ChannelFuture channelFuture = newConnection.send(commandData); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + listener.operationComplete(null); + commandData.getPromise().removeListener(listener); + releaseRead(newConnection); + log.error("Can't resubscribe blocking queue {}", commandData); + } + } + }); + } + }); + } public Future addSlave(String host, int port) { return addSlave(host, port, true, NodeType.SLAVE); diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 2b28c343f..edef7d2bb 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -36,7 +36,7 @@ public interface LoadBalancerManager { boolean unfreeze(String host, int port, FreezeReason freezeReason); - boolean freeze(String host, int port, FreezeReason freezeReason); + ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason); Future add(ClientConnectionsEntry entry); diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java index 8c73c4d8d..38135c62d 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java @@ -95,16 +95,16 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { return false; } - public boolean freeze(String host, int port, FreezeReason freezeReason) { + public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); if (connectionEntry == null) { - return false; + return null; } synchronized (connectionEntry) { if (connectionEntry.isFreezed()) { - return false; + return null; } connectionEntry.setFreezed(true); @@ -116,27 +116,7 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { } } - // close all connections - while (true) { - RedisConnection connection = connectionEntry.pollConnection(); - if (connection == null) { - break; - } - connection.closeAsync(); - } - - // close all pub/sub connections - while (true) { - RedisPubSubConnection connection = connectionEntry.pollSubscribeConnection(); - if (connection == null) { - break; - } - connection.closeAsync(); - } - - connectionManager.reattachPubSub(connectionEntry.getAllSubscribeConnections()); - connectionEntry.getAllSubscribeConnections().clear(); - return true; + return connectionEntry; } public Future nextPubSubConnection() {