ConnectionListener moved to NodesGroup

pull/365/head
Nikita 9 years ago
parent e195642a45
commit 9dfa71ce78

@ -337,16 +337,18 @@ public class Config {
return useLinuxNativeEpoll; return useLinuxNativeEpoll;
} }
@Deprecated
public ConnectionListener getConnectionListener() { public ConnectionListener getConnectionListener() {
return connectionListener; return connectionListener;
} }
/** /**
* Setup connect listener which will be triggered * Use {@code org.redisson.core.NodesGroup#addConnectionListener(ConnectionListener)}
* when Redisson has just been connected to or disconnected from redis server
* *
* @param connectionListener * @param connectionListener
* @return
*/ */
@Deprecated
public Config setConnectionListener(ConnectionListener connectionListener) { public Config setConnectionListener(ConnectionListener connectionListener) {
this.connectionListener = connectionListener; this.connectionListener = connectionListener;
return this; return this;

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry; import org.redisson.connection.RedisClientEntry;
import org.redisson.core.Node; import org.redisson.core.Node;
@ -106,4 +107,14 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
return res && result.size() == clients.size(); return res && result.size() == clients.size();
} }
@Override
public int addConnectionListener(ConnectionListener connectionListener) {
return connectionManager.getConnectionEventsHub().addListener(connectionListener);
}
@Override
public void removeConnectionListener(int listenerId) {
connectionManager.getConnectionEventsHub().removeListener(listenerId);
}
} }

@ -16,6 +16,7 @@
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -26,30 +27,40 @@ public class ConnectionEventsHub {
private final ConcurrentMap<InetSocketAddress, Status> maps = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<InetSocketAddress, Status> maps = PlatformDependent.newConcurrentHashMap();
private final ConnectionListener connectionListener; private final Map<Integer, ConnectionListener> listenersMap = PlatformDependent.newConcurrentHashMap();
public ConnectionEventsHub(ConnectionListener connectionListener) { public int addListener(ConnectionListener listener) {
this.connectionListener = connectionListener; int id = System.identityHashCode(listener);
listenersMap.put(id, listener);
return id;
}
public void removeListener(int listenerId) {
listenersMap.remove(listenerId);
} }
public void fireConnect(InetSocketAddress addr) { public void fireConnect(InetSocketAddress addr) {
if (connectionListener == null || maps.get(addr) == Status.CONNECTED) { if (listenersMap.isEmpty() || maps.get(addr) == Status.CONNECTED) {
return; return;
} }
if (maps.putIfAbsent(addr, Status.CONNECTED) == null if (maps.putIfAbsent(addr, Status.CONNECTED) == null
|| maps.replace(addr, Status.DISCONNECTED, Status.CONNECTED)) { || maps.replace(addr, Status.DISCONNECTED, Status.CONNECTED)) {
connectionListener.onConnect(addr); for (ConnectionListener listener : listenersMap.values()) {
listener.onConnect(addr);
}
} }
} }
public void fireDisconnect(InetSocketAddress addr) { public void fireDisconnect(InetSocketAddress addr) {
if (connectionListener == null || maps.get(addr) == Status.DISCONNECTED) { if (listenersMap.isEmpty() || maps.get(addr) == Status.DISCONNECTED) {
return; return;
} }
if (maps.replace(addr, Status.CONNECTED, Status.DISCONNECTED)) { if (maps.replace(addr, Status.CONNECTED, Status.DISCONNECTED)) {
connectionListener.onDisconnect(addr); for (ConnectionListener listener : listenersMap.values()) {
listener.onDisconnect(addr);
}
} }
} }

@ -121,7 +121,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private IdleConnectionWatcher connectionWatcher; private IdleConnectionWatcher connectionWatcher;
private ConnectionEventsHub connectionEventsHub; private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
init(config); init(config);
@ -196,8 +196,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.socketChannelClass = NioSocketChannel.class; this.socketChannelClass = NioSocketChannel.class;
} }
this.codec = cfg.getCodec(); this.codec = cfg.getCodec();
connectionEventsHub = new ConnectionEventsHub(cfg.getConnectionListener());
} }
@Override @Override

@ -17,6 +17,8 @@ package org.redisson.core;
import java.util.Collection; import java.util.Collection;
import org.redisson.connection.ConnectionListener;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -24,6 +26,21 @@ import java.util.Collection;
*/ */
public interface NodesGroup<N extends Node> { public interface NodesGroup<N extends Node> {
/**
* Adds connect listener which will be triggered
* when Redisson has just been connected to or disconnected from redis server
*
* @param connectionListener
*/
int addConnectionListener(ConnectionListener connectionListener);
/**
* Removes connect listener by id
*
* @param listenerId
*/
void removeConnectionListener(int listenerId);
/** /**
* All Redis nodes used by Redisson. * All Redis nodes used by Redisson.
* This collection may change during master change, cluster topology update and etc. * This collection may change during master change, cluster topology update and etc.

@ -50,8 +50,12 @@ public class RedissonTest {
final Waiter onDisconnectWaiter = new Waiter(); final Waiter onDisconnectWaiter = new Waiter();
Config config = new Config(); Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319").setFailedAttempts(1).setRetryAttempts(1); config.useSingleServer().setAddress("127.0.0.1:6319").setFailedAttempts(1).setRetryAttempts(1)
config.setConnectionListener(new ConnectionListener() { .setConnectionMinimumIdleSize(0);
RedissonClient r = Redisson.create(config);
int id = r.getNodesGroup().addConnectionListener(new ConnectionListener() {
@Override @Override
public void onDisconnect(InetSocketAddress addr) { public void onDisconnect(InetSocketAddress addr) {
@ -66,9 +70,9 @@ public class RedissonTest {
} }
}); });
RedissonClient r = Redisson.create(config); assertThat(id).isNotZero();
r.getBucket("1").get();
r.getBucket("1").get();
p.destroy(); p.destroy();
Assert.assertEquals(1, p.waitFor()); Assert.assertEquals(1, p.waitFor());

Loading…
Cancel
Save