Send channelFuture support improvement. Reconnection detection speedup

pull/243/head
Nikita 10 years ago
parent bf64e3a95d
commit 2019283a2e

@ -25,7 +25,7 @@ class BaseConfig<T extends BaseConfig<T>> {
*/ */
private int timeout = 60000; private int timeout = 60000;
private int retryAttempts = 5; private int retryAttempts = 20;
private int retryInterval = 1000; private int retryInterval = 1000;

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
@ -35,6 +36,8 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -170,7 +173,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
final Promise<Void> attemptPromise = connectionManager.newPromise(); final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>(); final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
TimerTask timerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) { if (attemptPromise.isDone()) {
@ -199,10 +202,21 @@ public class CommandBatchExecutorService extends CommandExecutorService {
for (CommandEntry c : entry.getCommands()) { for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand()); list.add(c.getCommand());
} }
connection.send(new CommandsData(attemptPromise, list)); ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
ex.set(new RedisTimeoutException()); ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
if (entry.isReadOnlyMode()) { if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
@ -211,7 +225,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
} }
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
ex.set(e); ex.set(e);
connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
} }
attemptPromise.addListener(new FutureListener<Void>() { attemptPromise.addListener(new FutureListener<Void>() {
@Override @Override

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
@ -38,6 +39,8 @@ import org.redisson.connection.ConnectionManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.DefaultPromise;
@ -335,7 +338,7 @@ public class CommandExecutorService implements CommandExecutor {
final Promise<R> attemptPromise = connectionManager.newPromise(); final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>(); final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
TimerTask timerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) { if (attemptPromise.isDone()) {
@ -360,10 +363,21 @@ public class CommandExecutorService implements CommandExecutor {
connection = connectionManager.connectionWriteOp(slot); connection = connectionManager.connectionWriteOp(slot);
} }
log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr()); log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr());
connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params)); ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ex.set(new RedisTimeoutException()); ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
if (readOnlyMode) { if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
@ -372,7 +386,7 @@ public class CommandExecutorService implements CommandExecutor {
} }
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
ex.set(e); ex.set(e);
connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
} }
attemptPromise.addListener(new FutureListener<R>() { attemptPromise.addListener(new FutureListener<R>() {
@Override @Override
@ -384,6 +398,7 @@ public class CommandExecutorService implements CommandExecutor {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt); async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt);
return; return;
} }

@ -16,7 +16,6 @@
package org.redisson; package org.redisson;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson; package org.redisson;
public interface SlotCallback<T, R> { public interface SlotCallback<T, R> {

@ -107,5 +107,10 @@ public class RedisClient {
return channels.close(); return channels.close();
} }
@Override
public String toString() {
return "RedisClient [addr=" + addr + "]";
}
} }

@ -91,7 +91,7 @@ public class RedisConnection implements RedisCommands {
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise(); Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new CommandData<T, R>(promise, encoder, command, params)); send(new CommandData<T, R>(promise, encoder, command, params));
return promise; return promise;
} }
@ -113,4 +113,9 @@ public class RedisConnection implements RedisCommands {
return channel.close(); return channel.close();
} }
@Override
public String toString() {
return getClass().getSimpleName() + " [redisClient=" + redisClient + ", channel=" + channel + "]";
}
} }

@ -0,0 +1,30 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
public class RedisConnectionClosedException extends RedisException {
private static final long serialVersionUID = -4756928186967834601L;
public RedisConnectionClosedException(String msg) {
super(msg);
}
public RedisConnectionClosedException(String msg, Throwable e) {
super(msg, e);
}
}

@ -20,6 +20,7 @@ import java.util.Queue;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -39,7 +40,7 @@ public class CommandsQueue extends ChannelDuplexHandler {
public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise"); public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise");
private final Queue<QueueCommand> queue = PlatformDependent.newMpscQueue(); private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@ -55,10 +56,10 @@ public class CommandsQueue extends ChannelDuplexHandler {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) { if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg; QueueCommand data = (QueueCommand) msg;
if (data.getSended().get()) { if (queue.peek() != null && queue.peek().getCommand() == data) {
super.write(ctx, msg, promise); super.write(ctx, msg, promise);
} else { } else {
queue.add(data); queue.add(new QueueCommandHolder(data, promise));
sendData(ctx); sendData(ctx);
} }
} else { } else {
@ -67,8 +68,9 @@ public class CommandsQueue extends ChannelDuplexHandler {
} }
private void sendData(ChannelHandlerContext ctx) throws Exception { private void sendData(ChannelHandlerContext ctx) throws Exception {
QueueCommand data = queue.peek(); QueueCommandHolder command = queue.peek();
if (data != null && data.getSended().compareAndSet(false, true)) { if (command != null && command.getSended().compareAndSet(false, true)) {
QueueCommand data = command.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations(); List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) { if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) { for (CommandData<Object, Object> cd : pubSubOps) {
@ -79,7 +81,7 @@ public class CommandsQueue extends ChannelDuplexHandler {
} else { } else {
ctx.channel().attr(REPLAY).set(data); ctx.channel().attr(REPLAY).set(data);
} }
ctx.channel().writeAndFlush(data); ctx.channel().writeAndFlush(data, command.getChannelPromise());
} }
} }

@ -30,7 +30,6 @@ public class CommandData<T, R> implements QueueCommand {
final RedisCommand<T> command; final RedisCommand<T> command;
final Object[] params; final Object[] params;
final Codec codec; final Codec codec;
final AtomicBoolean sended = new AtomicBoolean();
final MultiDecoder<Object> messageDecoder; final MultiDecoder<Object> messageDecoder;
public CommandData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) { public CommandData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
@ -61,10 +60,6 @@ public class CommandData<T, R> implements QueueCommand {
return promise; return promise;
} }
public AtomicBoolean getSended() {
return sended;
}
public Codec getCodec() { public Codec getCodec() {
return codec; return codec;
} }
@ -72,8 +67,7 @@ public class CommandData<T, R> implements QueueCommand {
@Override @Override
public String toString() { public String toString() {
return "CommandData [promise=" + promise + ", command=" + command + ", params=" return "CommandData [promise=" + promise + ", command=" + command + ", params="
+ Arrays.toString(params) + ", codec=" + codec + ", sended=" + sended + ", messageDecoder=" + Arrays.toString(params) + ", codec=" + codec + "]";
+ messageDecoder + "]";
} }
@Override @Override

@ -26,7 +26,6 @@ public class CommandsData implements QueueCommand {
private final List<CommandData<?, ?>> commands; private final List<CommandData<?, ?>> commands;
private final Promise<Void> promise; private final Promise<Void> promise;
private final AtomicBoolean sended = new AtomicBoolean();
public CommandsData(Promise<Void> promise, List<CommandData<?, ?>> commands) { public CommandsData(Promise<Void> promise, List<CommandData<?, ?>> commands) {
super(); super();
@ -42,10 +41,6 @@ public class CommandsData implements QueueCommand {
return commands; return commands;
} }
public AtomicBoolean getSended() {
return sended;
}
@Override @Override
public List<CommandData<Object, Object>> getPubSubOperations() { public List<CommandData<Object, Object>> getPubSubOperations() {
List<CommandData<Object, Object>> result = new ArrayList<CommandData<Object, Object>>(); List<CommandData<Object, Object>> result = new ArrayList<CommandData<Object, Object>>();

@ -16,12 +16,9 @@
package org.redisson.client.protocol; package org.redisson.client.protocol;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public interface QueueCommand { public interface QueueCommand {
List<CommandData<Object, Object>> getPubSubOperations(); List<CommandData<Object, Object>> getPubSubOperations();
AtomicBoolean getSended();
} }

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.ChannelPromise;
public class QueueCommandHolder {
final AtomicBoolean sended = new AtomicBoolean();
final ChannelPromise channelPromise;
final QueueCommand command;
public QueueCommandHolder(QueueCommand command, ChannelPromise channelPromise) {
super();
this.command = command;
this.channelPromise = channelPromise;
}
public QueueCommand getCommand() {
return command;
}
public ChannelPromise getChannelPromise() {
return channelPromise;
}
public AtomicBoolean getSended() {
return sended;
}
}

@ -189,6 +189,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
if (config.getClientName() != null) { if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
} }
log.debug("new connection created: {}", conn);
return conn; return conn;
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {

@ -134,6 +134,8 @@ public class MasterSlaveEntry {
if (config.getClientName() != null) { if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
} }
log.debug("new connection created: {}", conn);
return conn; return conn;
} catch (RedisConnectionException e) { } catch (RedisConnectionException e) {
masterEntry.getConnectionsSemaphore().release(); masterEntry.getConnectionsSemaphore().release();

Loading…
Cancel
Save