From a6fbdf5cb45dbe744902729c3f5f7ad671248df9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 10 Jun 2017 22:18:33 +0300 Subject: [PATCH] Fixed - connection listener is not invoked in some cases --- .../org/redisson/client/RedisConnection.java | 28 ++++++++++++++---- .../client/RedisPubSubConnection.java | 6 ++-- .../client/handler/ConnectionWatchdog.java | 5 ++-- .../connection/ClientConnectionsEntry.java | 29 +++++++++++++++---- .../test/java/org/redisson/RedissonTest.java | 2 +- 5 files changed, 53 insertions(+), 17 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index ad67e1151..f0eb265ee 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -15,8 +15,6 @@ */ package org.redisson.client; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,7 +25,6 @@ import org.redisson.client.handler.CommandsQueue; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.QueueCommand; -import org.redisson.client.protocol.QueueCommandHolder; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; @@ -58,6 +55,8 @@ public class RedisConnection implements RedisCommands { private RPromise connectionPromise; private long lastUsageTime; + private Runnable connectedListener; + private Runnable disconnectedListener; public RedisConnection(RedisClient redisClient, Channel channel, RPromise connectionPromise) { this.redisClient = redisClient; @@ -71,6 +70,26 @@ public class RedisConnection implements RedisCommands { this.redisClient = redisClient; } + public void fireConnected() { + if (connectedListener != null) { + connectedListener.run(); + } + } + + public void setConnectedListener(Runnable connectedListener) { + this.connectedListener = connectedListener; + } + + public void fireDisconnected() { + if (disconnectedListener != null) { + disconnectedListener.run(); + } + } + + public void setDisconnectedListener(Runnable disconnectedListener) { + this.disconnectedListener = disconnectedListener; + } + public RPromise getConnectionPromise() { return (RPromise) connectionPromise; } @@ -253,7 +272,4 @@ public class RedisConnection implements RedisCommands { return getClass().getSimpleName() + "@" + System.identityHashCode(this) + " [redisClient=" + redisClient + ", channel=" + channel + "]"; } - public void onDisconnect() { - } - } diff --git a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java index d3c49bdb1..4b38acf4f 100644 --- a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -131,7 +131,9 @@ public class RedisPubSubConnection extends RedisConnection { } @Override - public void onDisconnect() { + public void fireDisconnected() { + super.fireDisconnected(); + Set channels = new HashSet(); Set pchannels = new HashSet(); synchronized (this) { @@ -145,7 +147,7 @@ public class RedisPubSubConnection extends RedisConnection { onMessage(new PubSubStatusMessage(PubSubType.PUNSUBSCRIBE, channel)); } } - + public void punsubscribe(final String ... channels) { synchronized (this) { for (String ch : channels) { diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 2a4330f79..17dd7bbdb 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -16,7 +16,6 @@ package org.redisson.client.handler; import java.util.Map.Entry; -import java.util.Queue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -24,7 +23,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; -import org.redisson.client.protocol.QueueCommandHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +64,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { public void channelInactive(ChannelHandlerContext ctx) throws Exception { RedisConnection connection = RedisConnection.getFrom(ctx.channel()); if (connection != null) { - connection.onDisconnect(); + connection.fireDisconnected(); if (!connection.isClosed()) { if (connection.isFastReconnect()) { tryReconnect(connection, 1); @@ -152,6 +150,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private void refresh(RedisConnection connection, Channel channel) { CommandData currentCommand = connection.getCurrentCommand(); + connection.fireConnected(); connection.updateChannel(channel); reattachBlockingQueue(connection, currentCommand); diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 45bc74c8b..c148c35e8 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -137,15 +137,35 @@ public class ClientConnectionsEntry { if (!future.isSuccess()) { return; } + RedisConnection conn = future.getNow(); + onConnect(conn); log.debug("new connection created: {}", conn); - - connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr()); } - }); return future; } + + private void onConnect(final RedisConnection conn) { + conn.setConnectedListener(new Runnable() { + @Override + public void run() { + if (!connectionManager.isShuttingDown()) { + connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr()); + } + } + }); + conn.setDisconnectedListener(new Runnable() { + @Override + public void run() { + if (!connectionManager.isShuttingDown()) { + connectionManager.getConnectionEventsHub().fireDisconnect(conn.getRedisClient().getAddr()); + } + } + }); + + connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr()); + } public MasterSlaveServersConfig getConfig() { return connectionManager.getConfig(); @@ -161,10 +181,9 @@ public class ClientConnectionsEntry { } RedisPubSubConnection conn = future.getNow(); + onConnect(conn); log.debug("new pubsub connection created: {}", conn); - connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr()); - allSubscribeConnections.add(conn); } }); diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 38843b04a..d80e83bb1 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -290,7 +290,7 @@ public class RedissonTest { Assert.assertEquals(0, pp.stop()); await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1)); - await().until(() -> assertThat(disconnectCounter.get()).isEqualTo(1)); + await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(disconnectCounter.get()).isEqualTo(1)); } @Test