From 46138c7f14edfa182570a9add3d2aba404665511 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 4 Nov 2015 13:50:25 +0300 Subject: [PATCH] Async reconnection to channel. #274 --- .../redisson/client/ReconnectListener.java | 4 +- .../client/handler/ConnectionWatchdog.java | 23 +++--- .../client/protocol/RedisCommands.java | 4 +- .../cluster/ClusterConnectionListener.java | 10 +-- .../redisson/connection/ConnectionEntry.java | 51 ++++++------ .../connection/ConnectionListener.java | 3 +- .../connection/DefaultConnectionListener.java | 9 +-- .../connection/FutureConnectionListener.java | 77 +++++++++++++++++++ 8 files changed, 129 insertions(+), 52 deletions(-) create mode 100644 src/main/java/org/redisson/connection/FutureConnectionListener.java diff --git a/src/main/java/org/redisson/client/ReconnectListener.java b/src/main/java/org/redisson/client/ReconnectListener.java index 289e4c1ac..9a7a0d1ba 100644 --- a/src/main/java/org/redisson/client/ReconnectListener.java +++ b/src/main/java/org/redisson/client/ReconnectListener.java @@ -15,8 +15,10 @@ */ package org.redisson.client; +import io.netty.util.concurrent.Promise; + public interface ReconnectListener { - void onReconnect(RedisConnection redisConnection) throws RedisException; + void onReconnect(RedisConnection redisConnection, Promise connectionFuture) throws RedisException; } diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index f02112b8d..7f643d83b 100644 --- a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -33,6 +33,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { @@ -112,21 +115,21 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private void reconnect(final RedisConnection connection, final Channel channel) { if (connection.getReconnectListener() != null) { - bootstrap.group().execute(new Runnable() { + // new connection used only for channel init + RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel); + Promise connectionFuture = bootstrap.group().next().newPromise(); + connection.getReconnectListener().onReconnect(rc, connectionFuture); + connectionFuture.addListener(new FutureListener() { @Override - public void run() { - // new connection used only for channel init - RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel); - connection.getReconnectListener().onReconnect(rc); - connection.updateChannel(channel); - - resubscribe(connection); + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + connection.updateChannel(channel); + resubscribe(connection); + } } - }); } else { connection.updateChannel(channel); - resubscribe(connection); } } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index fa118e3d7..ef31a6f65 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -136,8 +136,8 @@ public interface RedisCommands { RedisStrictCommand INCRBY = new RedisStrictCommand("INCRBY"); RedisStrictCommand DECR = new RedisStrictCommand("DECR"); - RedisStrictCommand AUTH = new RedisStrictCommand("AUTH", new StringReplayDecoder()); - RedisStrictCommand SELECT = new RedisStrictCommand("SELECT", new StringReplayDecoder()); + RedisStrictCommand AUTH = new RedisStrictCommand("AUTH", new VoidReplayConvertor()); + RedisStrictCommand SELECT = new RedisStrictCommand("SELECT", new VoidReplayConvertor()); RedisStrictCommand CLIENT_SETNAME = new RedisStrictCommand("CLIENT", "SETNAME", new BooleanReplayConvertor()); RedisStrictCommand CLIENT_GETNAME = new RedisStrictCommand("CLIENT", "GETNAME", new StringDataDecoder()); RedisStrictCommand FLUSHDB = new RedisStrictCommand("FLUSHDB", new VoidReplayConvertor()); diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java index 4f683408e..5b5a44901 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionListener.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionListener.java @@ -16,11 +16,11 @@ package org.redisson.cluster; import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; -import org.redisson.connection.DefaultConnectionListener; import org.redisson.connection.ConnectionEntry.Mode; +import org.redisson.connection.DefaultConnectionListener; +import org.redisson.connection.FutureConnectionListener; public class ClusterConnectionListener extends DefaultConnectionListener { @@ -31,10 +31,10 @@ public class ClusterConnectionListener extends DefaultConnectionListener { } @Override - public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) throws RedisException { - super.onConnect(config, conn, serverMode); + public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException { + super.onConnect(config, serverMode, connectionListener); if (serverMode == Mode.SLAVE && readFromSlaves) { - conn.sync(RedisCommands.READONLY); + connectionListener.addCommand(RedisCommands.READONLY); } } diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 6fdab9035..3b2cc7cfe 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -24,12 +24,12 @@ import org.redisson.client.ReconnectListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.protocol.RedisCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; public class ConnectionEntry { @@ -93,6 +93,7 @@ public class ConnectionEntry { } public Future connect(final MasterSlaveServersConfig config) { + final Promise connectionFuture = client.getBootstrap().group().next().newPromise(); Future future = client.connectAsync(); future.addListener(new FutureListener() { @Override @@ -103,31 +104,29 @@ public class ConnectionEntry { RedisConnection conn = future.getNow(); log.debug("new connection created: {}", conn); - connectListener.onConnect(config, conn, serverMode); - conn.setReconnectListener(new ReconnectListener() { - @Override - public void onReconnect(RedisConnection conn) { - connectListener.onConnect(config, conn, serverMode); - } - }); + FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); + connectListener.onConnect(config, serverMode, listener); + listener.executeCommands(); + addReconnectListener(config, conn); } + }); - return future; + return connectionFuture; } - private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) { - if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); - } - if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); - } - if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); - } + private void addReconnectListener(final MasterSlaveServersConfig config, RedisConnection conn) { + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn, Promise connectionFuture) { + FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); + connectListener.onConnect(config, serverMode, listener); + listener.executeCommands(); + } + }); } public Future connectPubSub(final MasterSlaveServersConfig config) { + final Promise connectionFuture = client.getBootstrap().group().next().newPromise(); Future future = client.connectPubSubAsync(); future.addListener(new FutureListener() { @Override @@ -138,16 +137,14 @@ public class ConnectionEntry { RedisPubSubConnection conn = future.getNow(); log.debug("new pubsub connection created: {}", conn); - connectListener.onConnect(config, conn, serverMode); - conn.setReconnectListener(new ReconnectListener() { - @Override - public void onReconnect(RedisConnection conn) { - connectListener.onConnect(config, conn, serverMode); - } - }); + FutureConnectionListener listener = new FutureConnectionListener(connectionFuture, conn); + connectListener.onConnect(config, serverMode, listener); + listener.executeCommands(); + + addReconnectListener(config, conn); } }); - return future; + return connectionFuture; } @Override diff --git a/src/main/java/org/redisson/connection/ConnectionListener.java b/src/main/java/org/redisson/connection/ConnectionListener.java index a502ec228..e9a9a5f7f 100644 --- a/src/main/java/org/redisson/connection/ConnectionListener.java +++ b/src/main/java/org/redisson/connection/ConnectionListener.java @@ -16,12 +16,11 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.connection.ConnectionEntry.Mode; public interface ConnectionListener { - void onConnect(MasterSlaveServersConfig config, RedisConnection redisConnection, Mode serverMode) throws RedisException; + void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException; } diff --git a/src/main/java/org/redisson/connection/DefaultConnectionListener.java b/src/main/java/org/redisson/connection/DefaultConnectionListener.java index 2354d1a4e..47577d3f0 100644 --- a/src/main/java/org/redisson/connection/DefaultConnectionListener.java +++ b/src/main/java/org/redisson/connection/DefaultConnectionListener.java @@ -16,7 +16,6 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; -import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionEntry.Mode; @@ -24,16 +23,16 @@ import org.redisson.connection.ConnectionEntry.Mode; public class DefaultConnectionListener implements ConnectionListener { @Override - public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) + public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException { if (config.getPassword() != null) { - conn.sync(RedisCommands.AUTH, config.getPassword()); + connectionListener.addCommand(RedisCommands.AUTH, config.getPassword()); } if (config.getDatabase() != 0) { - conn.sync(RedisCommands.SELECT, config.getDatabase()); + connectionListener.addCommand(RedisCommands.SELECT, config.getDatabase()); } if (config.getClientName() != null) { - conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); + connectionListener.addCommand(RedisCommands.CLIENT_SETNAME, config.getClientName()); } } diff --git a/src/main/java/org/redisson/connection/FutureConnectionListener.java b/src/main/java/org/redisson/connection/FutureConnectionListener.java new file mode 100644 index 000000000..a7b58a9a2 --- /dev/null +++ b/src/main/java/org/redisson/connection/FutureConnectionListener.java @@ -0,0 +1,77 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.redisson.client.RedisConnection; +import org.redisson.client.protocol.RedisCommand; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +public class FutureConnectionListener implements FutureListener { + + private final AtomicInteger commandsCounter = new AtomicInteger(); + + private final Promise connectionPromise; + private final T connection; + private final List commands = new ArrayList(4); + + public FutureConnectionListener(Promise connectionFuture, T connection) { + super(); + this.connectionPromise = connectionFuture; + this.connection = connection; + } + + public void addCommand(final RedisCommand command, final Object ... params) { + commandsCounter.incrementAndGet(); + commands.add(new Runnable() { + @Override + public void run() { + Future future = connection.async(command, params); + future.addListener(FutureConnectionListener.this); + } + }); + } + + public void executeCommands() { + if (commands.isEmpty()) { + connectionPromise.setSuccess(connection); + return; + } + + for (Runnable command : commands) { + command.run(); + } + commands.clear(); + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + connectionPromise.tryFailure(future.cause()); + return; + } + if (commandsCounter.decrementAndGet() == 0) { + connectionPromise.trySuccess(connection); + } + } + +}