From 9dfa71ce78e68863c03721e917a0c8cdc9cc0fbd Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 28 Dec 2015 14:37:37 +0300 Subject: [PATCH] ConnectionListener moved to NodesGroup --- src/main/java/org/redisson/Config.java | 6 +++-- src/main/java/org/redisson/RedisNodes.java | 11 ++++++++ .../connection/ConnectionEventsHub.java | 25 +++++++++++++------ .../MasterSlaveConnectionManager.java | 4 +-- .../java/org/redisson/core/NodesGroup.java | 17 +++++++++++++ src/test/java/org/redisson/RedissonTest.java | 12 ++++++--- 6 files changed, 59 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redisson/Config.java b/src/main/java/org/redisson/Config.java index 80ebf9a9d..7b1374ae7 100644 --- a/src/main/java/org/redisson/Config.java +++ b/src/main/java/org/redisson/Config.java @@ -337,16 +337,18 @@ public class Config { return useLinuxNativeEpoll; } + @Deprecated public ConnectionListener getConnectionListener() { return connectionListener; } /** - * Setup connect listener which will be triggered - * when Redisson has just been connected to or disconnected from redis server + * Use {@code org.redisson.core.NodesGroup#addConnectionListener(ConnectionListener)} * * @param connectionListener + * @return */ + @Deprecated public Config setConnectionListener(ConnectionListener connectionListener) { this.connectionListener = connectionListener; return this; diff --git a/src/main/java/org/redisson/RedisNodes.java b/src/main/java/org/redisson/RedisNodes.java index 64d879072..7ffd68a4e 100644 --- a/src/main/java/org/redisson/RedisNodes.java +++ b/src/main/java/org/redisson/RedisNodes.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.client.RedisConnection; import org.redisson.client.protocol.RedisCommands; +import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionManager; import org.redisson.connection.RedisClientEntry; import org.redisson.core.Node; @@ -106,4 +107,14 @@ public class RedisNodes implements NodesGroup { return res && result.size() == clients.size(); } + @Override + public int addConnectionListener(ConnectionListener connectionListener) { + return connectionManager.getConnectionEventsHub().addListener(connectionListener); + } + + @Override + public void removeConnectionListener(int listenerId) { + connectionManager.getConnectionEventsHub().removeListener(listenerId); + } + } diff --git a/src/main/java/org/redisson/connection/ConnectionEventsHub.java b/src/main/java/org/redisson/connection/ConnectionEventsHub.java index a382f5dbd..3ce29ddfe 100644 --- a/src/main/java/org/redisson/connection/ConnectionEventsHub.java +++ b/src/main/java/org/redisson/connection/ConnectionEventsHub.java @@ -16,6 +16,7 @@ package org.redisson.connection; import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import io.netty.util.internal.PlatformDependent; @@ -26,30 +27,40 @@ public class ConnectionEventsHub { private final ConcurrentMap maps = PlatformDependent.newConcurrentHashMap(); - private final ConnectionListener connectionListener; + private final Map listenersMap = PlatformDependent.newConcurrentHashMap(); - public ConnectionEventsHub(ConnectionListener connectionListener) { - this.connectionListener = connectionListener; + public int addListener(ConnectionListener listener) { + int id = System.identityHashCode(listener); + listenersMap.put(id, listener); + return id; + } + + public void removeListener(int listenerId) { + listenersMap.remove(listenerId); } public void fireConnect(InetSocketAddress addr) { - if (connectionListener == null || maps.get(addr) == Status.CONNECTED) { + if (listenersMap.isEmpty() || maps.get(addr) == Status.CONNECTED) { return; } if (maps.putIfAbsent(addr, Status.CONNECTED) == null || maps.replace(addr, Status.DISCONNECTED, Status.CONNECTED)) { - connectionListener.onConnect(addr); + for (ConnectionListener listener : listenersMap.values()) { + listener.onConnect(addr); + } } } public void fireDisconnect(InetSocketAddress addr) { - if (connectionListener == null || maps.get(addr) == Status.DISCONNECTED) { + if (listenersMap.isEmpty() || maps.get(addr) == Status.DISCONNECTED) { return; } if (maps.replace(addr, Status.CONNECTED, Status.DISCONNECTED)) { - connectionListener.onDisconnect(addr); + for (ConnectionListener listener : listenersMap.values()) { + listener.onDisconnect(addr); + } } } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index b77ec7dc2..a16ff609c 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -121,7 +121,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private IdleConnectionWatcher connectionWatcher; - private ConnectionEventsHub connectionEventsHub; + private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub(); public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { init(config); @@ -196,8 +196,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.socketChannelClass = NioSocketChannel.class; } this.codec = cfg.getCodec(); - - connectionEventsHub = new ConnectionEventsHub(cfg.getConnectionListener()); } @Override diff --git a/src/main/java/org/redisson/core/NodesGroup.java b/src/main/java/org/redisson/core/NodesGroup.java index 00ebea4cd..fd5026a08 100644 --- a/src/main/java/org/redisson/core/NodesGroup.java +++ b/src/main/java/org/redisson/core/NodesGroup.java @@ -17,6 +17,8 @@ package org.redisson.core; import java.util.Collection; +import org.redisson.connection.ConnectionListener; + /** * * @author Nikita Koksharov @@ -24,6 +26,21 @@ import java.util.Collection; */ public interface NodesGroup { + /** + * Adds connect listener which will be triggered + * when Redisson has just been connected to or disconnected from redis server + * + * @param connectionListener + */ + int addConnectionListener(ConnectionListener connectionListener); + + /** + * Removes connect listener by id + * + * @param listenerId + */ + void removeConnectionListener(int listenerId); + /** * All Redis nodes used by Redisson. * This collection may change during master change, cluster topology update and etc. diff --git a/src/test/java/org/redisson/RedissonTest.java b/src/test/java/org/redisson/RedissonTest.java index 64d2fd93c..81defdb76 100644 --- a/src/test/java/org/redisson/RedissonTest.java +++ b/src/test/java/org/redisson/RedissonTest.java @@ -50,8 +50,12 @@ public class RedissonTest { final Waiter onDisconnectWaiter = new Waiter(); Config config = new Config(); - config.useSingleServer().setAddress("127.0.0.1:6319").setFailedAttempts(1).setRetryAttempts(1); - config.setConnectionListener(new ConnectionListener() { + config.useSingleServer().setAddress("127.0.0.1:6319").setFailedAttempts(1).setRetryAttempts(1) + .setConnectionMinimumIdleSize(0); + + RedissonClient r = Redisson.create(config); + + int id = r.getNodesGroup().addConnectionListener(new ConnectionListener() { @Override public void onDisconnect(InetSocketAddress addr) { @@ -66,9 +70,9 @@ public class RedissonTest { } }); - RedissonClient r = Redisson.create(config); - r.getBucket("1").get(); + assertThat(id).isNotZero(); + r.getBucket("1").get(); p.destroy(); Assert.assertEquals(1, p.waitFor());