Async reconnection to channel. #274

pull/282/head
Nikita 9 years ago
parent 8410d6f620
commit 46138c7f14

@ -15,8 +15,10 @@
*/
package org.redisson.client;
import io.netty.util.concurrent.Promise;
public interface ReconnectListener {
void onReconnect(RedisConnection redisConnection) throws RedisException;
void onReconnect(RedisConnection redisConnection, Promise<RedisConnection> connectionFuture) throws RedisException;
}

@ -33,6 +33,9 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@ -112,21 +115,21 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private void reconnect(final RedisConnection connection, final Channel channel) {
if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() {
// new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
Promise<RedisConnection> connectionFuture = bootstrap.group().next().newPromise();
connection.getReconnectListener().onReconnect(rc, connectionFuture);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void run() {
// new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
connection.getReconnectListener().onReconnect(rc);
connection.updateChannel(channel);
resubscribe(connection);
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) {
connection.updateChannel(channel);
resubscribe(connection);
}
}
});
} else {
connection.updateChannel(channel);
resubscribe(connection);
}
}

@ -136,8 +136,8 @@ public interface RedisCommands {
RedisStrictCommand<Long> INCRBY = new RedisStrictCommand<Long>("INCRBY");
RedisStrictCommand<Long> DECR = new RedisStrictCommand<Long>("DECR");
RedisStrictCommand<String> AUTH = new RedisStrictCommand<String>("AUTH", new StringReplayDecoder());
RedisStrictCommand<String> SELECT = new RedisStrictCommand<String>("SELECT", new StringReplayDecoder());
RedisStrictCommand<Void> AUTH = new RedisStrictCommand<Void>("AUTH", new VoidReplayConvertor());
RedisStrictCommand<Void> SELECT = new RedisStrictCommand<Void>("SELECT", new VoidReplayConvertor());
RedisStrictCommand<Boolean> CLIENT_SETNAME = new RedisStrictCommand<Boolean>("CLIENT", "SETNAME", new BooleanReplayConvertor());
RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder());
RedisStrictCommand<Void> FLUSHDB = new RedisStrictCommand<Void>("FLUSHDB", new VoidReplayConvertor());

@ -16,11 +16,11 @@
package org.redisson.cluster;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.FutureConnectionListener;
public class ClusterConnectionListener extends DefaultConnectionListener {
@ -31,10 +31,10 @@ public class ClusterConnectionListener extends DefaultConnectionListener {
}
@Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) throws RedisException {
super.onConnect(config, conn, serverMode);
public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException {
super.onConnect(config, serverMode, connectionListener);
if (serverMode == Mode.SLAVE && readFromSlaves) {
conn.sync(RedisCommands.READONLY);
connectionListener.addCommand(RedisCommands.READONLY);
}
}

@ -24,12 +24,12 @@ import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class ConnectionEntry {
@ -93,6 +93,7 @@ public class ConnectionEntry {
}
public Future<RedisConnection> connect(final MasterSlaveServersConfig config) {
final Promise<RedisConnection> connectionFuture = client.getBootstrap().group().next().newPromise();
Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
@ -103,31 +104,29 @@ public class ConnectionEntry {
RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn);
connectListener.onConnect(config, conn, serverMode);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
connectListener.onConnect(config, conn, serverMode);
}
});
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
listener.executeCommands();
addReconnectListener(config, conn);
}
});
return future;
return connectionFuture;
}
private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) {
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
private void addReconnectListener(final MasterSlaveServersConfig config, RedisConnection conn) {
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
listener.executeCommands();
}
});
}
public Future<RedisPubSubConnection> connectPubSub(final MasterSlaveServersConfig config) {
final Promise<RedisPubSubConnection> connectionFuture = client.getBootstrap().group().next().newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync();
future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
@ -138,16 +137,14 @@ public class ConnectionEntry {
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);
connectListener.onConnect(config, conn, serverMode);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
connectListener.onConnect(config, conn, serverMode);
}
});
FutureConnectionListener<RedisPubSubConnection> listener = new FutureConnectionListener<RedisPubSubConnection>(connectionFuture, conn);
connectListener.onConnect(config, serverMode, listener);
listener.executeCommands();
addReconnectListener(config, conn);
}
});
return future;
return connectionFuture;
}
@Override

@ -16,12 +16,11 @@
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.connection.ConnectionEntry.Mode;
public interface ConnectionListener {
void onConnect(MasterSlaveServersConfig config, RedisConnection redisConnection, Mode serverMode) throws RedisException;
void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException;
}

@ -16,7 +16,6 @@
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode;
@ -24,16 +23,16 @@ import org.redisson.connection.ConnectionEntry.Mode;
public class DefaultConnectionListener implements ConnectionListener {
@Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode)
public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener)
throws RedisException {
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
connectionListener.addCommand(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
connectionListener.addCommand(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
connectionListener.addCommand(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
}

@ -0,0 +1,77 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommand;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class FutureConnectionListener<T extends RedisConnection> implements FutureListener<Object> {
private final AtomicInteger commandsCounter = new AtomicInteger();
private final Promise<T> connectionPromise;
private final T connection;
private final List<Runnable> commands = new ArrayList<Runnable>(4);
public FutureConnectionListener(Promise<T> connectionFuture, T connection) {
super();
this.connectionPromise = connectionFuture;
this.connection = connection;
}
public void addCommand(final RedisCommand<?> command, final Object ... params) {
commandsCounter.incrementAndGet();
commands.add(new Runnable() {
@Override
public void run() {
Future<Object> future = connection.async(command, params);
future.addListener(FutureConnectionListener.this);
}
});
}
public void executeCommands() {
if (commands.isEmpty()) {
connectionPromise.setSuccess(connection);
return;
}
for (Runnable command : commands) {
command.run();
}
commands.clear();
}
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
connectionPromise.tryFailure(future.cause());
return;
}
if (commandsCounter.decrementAndGet() == 0) {
connectionPromise.trySuccess(connection);
}
}
}
Loading…
Cancel
Save