Log unexpected errors in netty handlers. #617

pull/616/head
Nikita 9 years ago
parent f02f0ae684
commit 120fd686a2

@ -21,6 +21,8 @@ import java.util.Queue;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -39,6 +41,8 @@ import io.netty.util.internal.PlatformDependent;
*/
public class CommandsQueue extends ChannelOutboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(CommandsQueue.class);
public static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise");
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
@ -53,7 +57,7 @@ public class CommandsQueue extends ChannelOutboundHandlerAdapter {
};
public void sendNextCommand(Channel channel) {
channel.attr(CommandsQueue.CURRENT_COMMAND).remove();
channel.attr(CommandsQueue.CURRENT_COMMAND).set(null);
queue.poll();
sendData(channel);
}
@ -94,4 +98,18 @@ public class CommandsQueue extends ChannelOutboundHandlerAdapter {
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
QueueCommand command = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
if (command != null) {
if (!command.tryFailure(cause)) {
log.error("Exception occured. Channel: " + ctx.channel() + " Command: " + command, cause);
}
sendNextCommand(ctx.channel());
return;
}
log.error("Exception occured. Channel: " + ctx.channel(), cause);
}
}

@ -40,7 +40,6 @@ import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@ -156,6 +155,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
super.exceptionCaught(ctx, cause);
}
private void refresh(RedisConnection connection, Channel channel) {

@ -21,6 +21,13 @@ import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
*
* @param <T> input type
* @param <R> output type
*/
public class BatchCommandData<T, R> extends CommandData<T, R> implements Comparable<BatchCommandData<T, R>> {
private final int index;

@ -23,6 +23,13 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
*
* @param <T> input type
* @param <R> output type
*/
public class CommandData<T, R> implements QueueCommand {
final RPromise<R> promise;

@ -20,6 +20,11 @@ import java.util.List;
import org.redisson.misc.RPromise;
/**
*
* @author Nikita Koksharov
*
*/
public class CommandsData implements QueueCommand {
private final List<CommandData<?, ?>> commands;
@ -50,4 +55,9 @@ public class CommandsData implements QueueCommand {
return result;
}
@Override
public boolean tryFailure(Throwable cause) {
return promise.tryFailure(cause);
}
}

@ -20,6 +20,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*
* @author Nikita Koksharov
*
*/
public interface QueueCommand {
Set<String> PUBSUB_COMMANDS = new HashSet<String>(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE"));
@ -29,4 +34,6 @@ public interface QueueCommand {
List<CommandData<Object, Object>> getPubSubOperations();
boolean tryFailure(Throwable cause);
}

Loading…
Cancel
Save