Fixed - ConnectionListener.onConnect() method isn't triggered during Redisson start #3522

pull/3941/head
Nikita Koksharov 4 years ago
parent ad33c15f28
commit acbff29eab

@ -20,10 +20,7 @@ import org.redisson.client.DefaultNettyHook;
import org.redisson.client.NettyHook;
import org.redisson.client.codec.Codec;
import org.redisson.codec.MarshallingCodec;
import org.redisson.connection.AddressResolverGroupFactory;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.DnsAddressResolverGroupFactory;
import org.redisson.connection.ReplicatedConnectionManager;
import org.redisson.connection.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,6 +84,8 @@ public class Config {
private NettyHook nettyHook = new DefaultNettyHook();
private ConnectionListener connectionListener;
private boolean useThreadClassLoader = true;
private AddressResolverGroupFactory addressResolverGroupFactory = new DnsAddressResolverGroupFactory();
@ -103,6 +102,7 @@ public class Config {
oldConf.setCodec(new MarshallingCodec());
}
setConnectionListener(oldConf.getConnectionListener());
setUseThreadClassLoader(oldConf.isUseThreadClassLoader());
setMinCleanUpDelay(oldConf.getMinCleanUpDelay());
setMaxCleanUpDelay(oldConf.getMaxCleanUpDelay());
@ -798,4 +798,20 @@ public class Config {
this.reliableTopicWatchdogTimeout = timeout;
return this;
}
public ConnectionListener getConnectionListener() {
return connectionListener;
}
/**
* Sets connection listener which is triggered
* when Redisson connected/disconnected to Redis server
*
* @param connectionListener - connection listener
* @return config
*/
public Config setConnectionListener(ConnectionListener connectionListener) {
this.connectionListener = connectionListener;
return this;
}
}

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
/**
* Redis connection listener
*
* @author Nikita Koksharov
*

@ -200,6 +200,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.cfg = cfg;
this.codec = cfg.getCodec();
if (cfg.getConnectionListener() != null) {
connectionEventsHub.addListener(cfg.getConnectionListener());
}
}
protected void closeNodeConnections() {

@ -231,10 +231,7 @@ public class RedissonTest extends BaseTest {
Config config = new Config();
config.useSingleServer().setAddress(p.getRedisServerAddressAndPort());
RedissonClient r = Redisson.create(config);
int id = r.getNodesGroup().addConnectionListener(new ConnectionListener() {
config.setConnectionListener(new ConnectionListener() {
@Override
public void onDisconnect(InetSocketAddress addr) {
@ -249,11 +246,11 @@ public class RedissonTest extends BaseTest {
}
});
assertThat(id).isNotZero();
RedissonClient r = Redisson.create(config);
r.getBucket("1").get();
Assert.assertEquals(0, p.stop());
await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1);
try {
@ -261,7 +258,7 @@ public class RedissonTest extends BaseTest {
} catch (Exception e) {
}
assertThat(connectCounter.get()).isEqualTo(0);
assertThat(connectCounter.get()).isEqualTo(1);
assertThat(disconnectCounter.get()).isEqualTo(1);
RedisProcess pp = new RedisRunner()
@ -272,7 +269,7 @@ public class RedissonTest extends BaseTest {
r.getBucket("1").get();
assertThat(connectCounter.get()).isEqualTo(1);
assertThat(connectCounter.get()).isEqualTo(2);
assertThat(disconnectCounter.get()).isEqualTo(1);
r.shutdown();
@ -387,7 +384,7 @@ public class RedissonTest extends BaseTest {
System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors);
assertThat(errors).isLessThan(600);
assertThat(errors).isLessThan(800);
assertThat(readonlyErrors).isZero();
redisson.shutdown();

Loading…
Cancel
Save