channel configuration during reconnection. #198

pull/238/head
Nikita 10 years ago
parent abe97f75e8
commit 043b2c2e09

@ -0,0 +1,7 @@
package org.redisson.client;
public interface ReconnectListener {
void onReconnect(RedisConnection redisConnection);
}

@ -38,6 +38,7 @@ public class RedisConnection implements RedisCommands {
private volatile boolean closed;
volatile Channel channel;
private ReconnectListener reconnectListener;
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
@ -46,11 +47,22 @@ public class RedisConnection implements RedisCommands {
updateChannel(channel);
}
public void updateChannel(Channel channel) {
public void setReconnectListener(ReconnectListener reconnectListener) {
this.reconnectListener = reconnectListener;
}
private void updateChannel(Channel channel) {
this.channel = channel;
channel.attr(CONNECTION).set(this);
}
public void onReconnect(Channel channel) {
updateChannel(channel);
if (reconnectListener != null) {
reconnectListener.onReconnect(this);
}
}
public RedisClient getRedisClient() {
return redisClient;
}

@ -23,11 +23,11 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.GenericFutureListener;
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@ -74,7 +74,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() {
bootstrap.connect().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (connection.isClosed()) {
@ -83,7 +84,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
if (future.isSuccess()) {
log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
connection.updateChannel(future.channel());
connection.onReconnect(future.channel());
return;
}
@ -95,6 +96,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
}
}, timeout, TimeUnit.MILLISECONDS);
}
});
}

@ -28,7 +28,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.ReclosableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -149,19 +148,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
if (conn != null) {
return conn;
}
conn = entry.getClient().connectPubSub();
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
entry.registerSubscribeConnection(conn);
return conn;
return entry.connectPubSub(config);
} catch (RedisConnectionException e) {
entry.getSubscribeConnectionsSemaphore().release();
// TODO connection scoring

@ -20,18 +20,20 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
final Logger log = LoggerFactory.getLogger(getClass());
private volatile boolean freezed;
private final RedisClient client;
final RedisClient client;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Semaphore connectionsSemaphore;
@ -61,8 +63,22 @@ public class ConnectionEntry {
return connections;
}
public RedisConnection connect(MasterSlaveServersConfig config) {
public RedisConnection connect(final MasterSlaveServersConfig config) {
RedisConnection conn = client.connect();
log.debug("new connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
return conn;
}
private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) {
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
@ -72,8 +88,19 @@ public class ConnectionEntry {
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
}
log.debug("new connection created: {}", conn);
public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) {
RedisPubSubConnection conn = client.connectPubSub();
log.debug("new pubsub connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
return conn;
}

@ -20,7 +20,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
public class SingleEntry extends MasterSlaveEntry {
@ -54,18 +53,7 @@ public class SingleEntry extends MasterSlaveEntry {
}
try {
conn = masterEntry.getClient().connectPubSub();
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
return conn;
return masterEntry.connectPubSub(config);
} catch (RedisConnectionException e) {
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
throw e;

@ -19,6 +19,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisPubSubConnection;
@ -53,5 +54,12 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
return subscribeConnectionsSemaphore;
}
public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) {
RedisPubSubConnection conn = super.connectPubSub(config);
allSubscribeConnections.offer(conn);
return conn;
}
}

Loading…
Cancel
Save