Fixed - connection listener is not invoked in some cases

pull/709/merge
Nikita 8 years ago
parent 519956b683
commit a6fbdf5cb4

@ -15,8 +15,6 @@
*/
package org.redisson.client;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -27,7 +25,6 @@ import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
@ -58,6 +55,8 @@ public class RedisConnection implements RedisCommands {
private RPromise<?> connectionPromise;
private long lastUsageTime;
private Runnable connectedListener;
private Runnable disconnectedListener;
public <C> RedisConnection(RedisClient redisClient, Channel channel, RPromise<C> connectionPromise) {
this.redisClient = redisClient;
@ -71,6 +70,26 @@ public class RedisConnection implements RedisCommands {
this.redisClient = redisClient;
}
public void fireConnected() {
if (connectedListener != null) {
connectedListener.run();
}
}
public void setConnectedListener(Runnable connectedListener) {
this.connectedListener = connectedListener;
}
public void fireDisconnected() {
if (disconnectedListener != null) {
disconnectedListener.run();
}
}
public void setDisconnectedListener(Runnable disconnectedListener) {
this.disconnectedListener = disconnectedListener;
}
public <C extends RedisConnection> RPromise<C> getConnectionPromise() {
return (RPromise<C>) connectionPromise;
}
@ -253,7 +272,4 @@ public class RedisConnection implements RedisCommands {
return getClass().getSimpleName() + "@" + System.identityHashCode(this) + " [redisClient=" + redisClient + ", channel=" + channel + "]";
}
public void onDisconnect() {
}
}

@ -131,7 +131,9 @@ public class RedisPubSubConnection extends RedisConnection {
}
@Override
public void onDisconnect() {
public void fireDisconnected() {
super.fireDisconnected();
Set<String> channels = new HashSet<String>();
Set<String> pchannels = new HashSet<String>();
synchronized (this) {

@ -16,7 +16,6 @@
package org.redisson.client.handler;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -24,7 +23,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommandHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,7 +64,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
if (connection != null) {
connection.onDisconnect();
connection.fireDisconnected();
if (!connection.isClosed()) {
if (connection.isFastReconnect()) {
tryReconnect(connection, 1);
@ -152,6 +150,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private void refresh(RedisConnection connection, Channel channel) {
CommandData<?, ?> currentCommand = connection.getCurrentCommand();
connection.fireConnected();
connection.updateChannel(channel);
reattachBlockingQueue(connection, currentCommand);

@ -137,14 +137,34 @@ public class ClientConnectionsEntry {
if (!future.isSuccess()) {
return;
}
RedisConnection conn = future.getNow();
onConnect(conn);
log.debug("new connection created: {}", conn);
}
});
return future;
}
private void onConnect(final RedisConnection conn) {
conn.setConnectedListener(new Runnable() {
@Override
public void run() {
if (!connectionManager.isShuttingDown()) {
connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr());
}
}
});
return future;
conn.setDisconnectedListener(new Runnable() {
@Override
public void run() {
if (!connectionManager.isShuttingDown()) {
connectionManager.getConnectionEventsHub().fireDisconnect(conn.getRedisClient().getAddr());
}
}
});
connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr());
}
public MasterSlaveServersConfig getConfig() {
@ -161,10 +181,9 @@ public class ClientConnectionsEntry {
}
RedisPubSubConnection conn = future.getNow();
onConnect(conn);
log.debug("new pubsub connection created: {}", conn);
connectionManager.getConnectionEventsHub().fireConnect(conn.getRedisClient().getAddr());
allSubscribeConnections.add(conn);
}
});

@ -290,7 +290,7 @@ public class RedissonTest {
Assert.assertEquals(0, pp.stop());
await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1));
await().until(() -> assertThat(disconnectCounter.get()).isEqualTo(1));
await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(disconnectCounter.get()).isEqualTo(1));
}
@Test

Loading…
Cancel
Save