From 043b2c2e099095ed42a5e907294c4f029f63ac20 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 7 Sep 2015 10:13:56 +0300 Subject: [PATCH] channel configuration during reconnection. #198 --- .../redisson/client/ReconnectListener.java | 7 ++++ .../org/redisson/client/RedisConnection.java | 14 +++++++- .../client/handler/ConnectionWatchdog.java | 8 +++-- .../redisson/connection/BaseLoadBalancer.java | 15 +------- .../redisson/connection/ConnectionEntry.java | 35 ++++++++++++++++--- .../org/redisson/connection/SingleEntry.java | 14 +------- .../connection/SubscribesConnectionEntry.java | 8 +++++ 7 files changed, 66 insertions(+), 35 deletions(-) create mode 100644 src/main/java/org/redisson/client/ReconnectListener.java diff --git a/src/main/java/org/redisson/client/ReconnectListener.java b/src/main/java/org/redisson/client/ReconnectListener.java new file mode 100644 index 000000000..a02e3a356 --- /dev/null +++ b/src/main/java/org/redisson/client/ReconnectListener.java @@ -0,0 +1,7 @@ +package org.redisson.client; + +public interface ReconnectListener { + + void onReconnect(RedisConnection redisConnection); + +} diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index c521ff7dd..c547cbdb0 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -38,6 +38,7 @@ public class RedisConnection implements RedisCommands { private volatile boolean closed; volatile Channel channel; + private ReconnectListener reconnectListener; public RedisConnection(RedisClient redisClient, Channel channel) { super(); @@ -46,11 +47,22 @@ public class RedisConnection implements RedisCommands { updateChannel(channel); } - public void updateChannel(Channel channel) { + public void setReconnectListener(ReconnectListener reconnectListener) { + this.reconnectListener = reconnectListener; + } + + private void updateChannel(Channel channel) { this.channel = channel; channel.attr(CONNECTION).set(this); } + public void onReconnect(Channel channel) { + updateChannel(channel); + if (reconnectListener != null) { + reconnectListener.onReconnect(this); + } + } + public RedisClient getRedisClient() { return redisClient; } diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 690fa06ba..dd6d05eb4 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -23,11 +23,11 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; -import io.netty.util.concurrent.GenericFutureListener; public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { @@ -74,7 +74,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection); - bootstrap.connect().addListener(new GenericFutureListener() { + bootstrap.connect().addListener(new ChannelFutureListener() { + @Override public void operationComplete(ChannelFuture future) throws Exception { if (connection.isClosed()) { @@ -83,7 +84,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { if (future.isSuccess()) { log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); - connection.updateChannel(future.channel()); + connection.onReconnect(future.channel()); return; } @@ -95,6 +96,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { } }, timeout, TimeUnit.MILLISECONDS); } + }); } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 5a0d8330e..20976ea83 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -28,7 +28,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.protocol.RedisCommands; import org.redisson.misc.ReclosableLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,19 +148,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { if (conn != null) { return conn; } - conn = entry.getClient().connectPubSub(); - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } - - entry.registerSubscribeConnection(conn); - return conn; + return entry.connectPubSub(config); } catch (RedisConnectionException e) { entry.getSubscribeConnectionsSemaphore().release(); // TODO connection scoring diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 5993f9782..3adbe3cad 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -20,18 +20,20 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.ReconnectListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.RedisCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConnectionEntry { - private final Logger log = LoggerFactory.getLogger(getClass()); + final Logger log = LoggerFactory.getLogger(getClass()); private volatile boolean freezed; - private final RedisClient client; + final RedisClient client; private final Queue connections = new ConcurrentLinkedQueue(); private final Semaphore connectionsSemaphore; @@ -61,8 +63,22 @@ public class ConnectionEntry { return connections; } - public RedisConnection connect(MasterSlaveServersConfig config) { + public RedisConnection connect(final MasterSlaveServersConfig config) { RedisConnection conn = client.connect(); + log.debug("new connection created: {}", conn); + + prepareConnection(config, conn); + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn) { + prepareConnection(config, conn); + } + }); + + return conn; + } + + private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) { if (config.getPassword() != null) { conn.sync(RedisCommands.AUTH, config.getPassword()); } @@ -72,8 +88,19 @@ public class ConnectionEntry { if (config.getClientName() != null) { conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); } + } - log.debug("new connection created: {}", conn); + public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) { + RedisPubSubConnection conn = client.connectPubSub(); + log.debug("new pubsub connection created: {}", conn); + + prepareConnection(config, conn); + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn) { + prepareConnection(config, conn); + } + }); return conn; } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index f4d3cc193..62159402f 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -20,7 +20,6 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.protocol.RedisCommands; public class SingleEntry extends MasterSlaveEntry { @@ -54,18 +53,7 @@ public class SingleEntry extends MasterSlaveEntry { } try { - conn = masterEntry.getClient().connectPubSub(); - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } - - return conn; + return masterEntry.connectPubSub(config); } catch (RedisConnectionException e) { ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release(); throw e; diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index b7331f4ef..9fe0f8e82 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -19,6 +19,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; +import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisPubSubConnection; @@ -53,5 +54,12 @@ public class SubscribesConnectionEntry extends ConnectionEntry { return subscribeConnectionsSemaphore; } + public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) { + RedisPubSubConnection conn = super.connectPubSub(config); + allSubscribeConnections.offer(conn); + return conn; + } + + }