From 2ac65d2fc24bb98724794d73ecc264accf71b63f Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 12 Aug 2015 09:53:26 +0300 Subject: [PATCH] Command encoding errors handling bug fixed. #216 --- .../redisson/CommandBatchExecutorService.java | 4 +- .../org/redisson/CommandExecutorService.java | 5 +- ...ava => RedisConnectionWriteException.java} | 6 +- .../client/handler/CommandDecoder.java | 8 +-- .../client/handler/CommandsQueue.java | 26 ++++--- .../connection/SingleConnectionManager.java | 1 + src/test/java/org/redisson/RedissonTest.java | 68 +++++++++++++++++++ 7 files changed, 94 insertions(+), 24 deletions(-) rename src/main/java/org/redisson/client/{RedisConnectionClosedException.java => RedisConnectionWriteException.java} (80%) create mode 100644 src/test/java/org/redisson/RedissonTest.java diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 7122a2b48..2eedbe329 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.client.RedisConnectionClosedException; +import org.redisson.client.RedisConnectionWriteException; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -212,7 +212,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { timeout.cancel(); - ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed")); + ex.set(new RedisConnectionWriteException("channel: " + future.channel() + " closed")); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 907b0ede8..05d0efcca 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisConnectionClosedException; +import org.redisson.client.RedisConnectionWriteException; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; @@ -422,7 +422,8 @@ public class CommandExecutorService implements CommandExecutor { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { timeout.cancel(); - ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed")); + ex.set(new RedisConnectionWriteException( + "Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause())); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } } diff --git a/src/main/java/org/redisson/client/RedisConnectionClosedException.java b/src/main/java/org/redisson/client/RedisConnectionWriteException.java similarity index 80% rename from src/main/java/org/redisson/client/RedisConnectionClosedException.java rename to src/main/java/org/redisson/client/RedisConnectionWriteException.java index aeca217ec..5b08a22a0 100644 --- a/src/main/java/org/redisson/client/RedisConnectionClosedException.java +++ b/src/main/java/org/redisson/client/RedisConnectionWriteException.java @@ -15,15 +15,15 @@ */ package org.redisson.client; -public class RedisConnectionClosedException extends RedisException { +public class RedisConnectionWriteException extends RedisException { private static final long serialVersionUID = -4756928186967834601L; - public RedisConnectionClosedException(String msg) { + public RedisConnectionWriteException(String msg) { super(msg); } - public RedisConnectionClosedException(String msg, Throwable e) { + public RedisConnectionWriteException(String msg, Throwable e) { super(msg, e); } diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 15ffabd31..a3c27728d 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -21,12 +21,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CancellationException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.handler.CommandsQueue.QueueCommands; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; @@ -132,8 +130,7 @@ public class CommandDecoder extends ReplayingDecoder { log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); } - ctx.channel().attr(CommandsQueue.REPLAY).remove(); - ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); state(null); } else { @@ -143,8 +140,7 @@ public class CommandDecoder extends ReplayingDecoder { return; } - ctx.channel().attr(CommandsQueue.REPLAY).remove(); - ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); state(null); } diff --git a/src/main/java/org/redisson/client/handler/CommandsQueue.java b/src/main/java/org/redisson/client/handler/CommandsQueue.java index 056e31b35..c16409746 100644 --- a/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -23,6 +23,8 @@ import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommandHolder; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.AttributeKey; @@ -36,20 +38,14 @@ import io.netty.util.internal.PlatformDependent; */ public class CommandsQueue extends ChannelDuplexHandler { - public enum QueueCommands {NEXT_COMMAND} - public static final AttributeKey REPLAY = AttributeKey.valueOf("promise"); private final Queue queue = PlatformDependent.newMpscQueue(); - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt == QueueCommands.NEXT_COMMAND) { - queue.poll(); - sendData(ctx); - } else { - super.userEventTriggered(ctx, evt); - } + public void sendNextCommand(ChannelHandlerContext ctx) throws Exception { + ctx.channel().attr(CommandsQueue.REPLAY).remove(); + queue.poll(); + sendData(ctx); } @Override @@ -67,7 +63,7 @@ public class CommandsQueue extends ChannelDuplexHandler { } } - private void sendData(ChannelHandlerContext ctx) throws Exception { + private void sendData(final ChannelHandlerContext ctx) throws Exception { QueueCommandHolder command = queue.peek(); if (command != null && command.getSended().compareAndSet(false, true)) { QueueCommand data = command.getCommand(); @@ -81,6 +77,14 @@ public class CommandsQueue extends ChannelDuplexHandler { } else { ctx.channel().attr(REPLAY).set(data); } + command.getChannelPromise().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + sendNextCommand(ctx); + } + } + }); ctx.channel().writeAndFlush(data, command.getChannelPromise()); } } diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index fbc8ce0ec..07b794756 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -27,6 +27,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setRetryAttempts(cfg.getRetryAttempts()); newconfig.setRetryInterval(cfg.getRetryInterval()); newconfig.setTimeout(cfg.getTimeout()); + newconfig.setPingTimeout(cfg.getPingTimeout()); newconfig.setPassword(cfg.getPassword()); newconfig.setDatabase(cfg.getDatabase()); newconfig.setClientName(cfg.getClientName()); diff --git a/src/test/java/org/redisson/RedissonTest.java b/src/test/java/org/redisson/RedissonTest.java new file mode 100644 index 000000000..9625f0b66 --- /dev/null +++ b/src/test/java/org/redisson/RedissonTest.java @@ -0,0 +1,68 @@ +package org.redisson; + +import java.util.Iterator; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.client.RedisConnectionWriteException; +import org.redisson.codec.SerializationCodec; +import org.redisson.core.ClusterNode; +import org.redisson.core.Node; +import org.redisson.core.NodesGroup; + +public class RedissonTest extends BaseTest { + + public static class Dummy { + private String field; + } + + @Test(expected = RedisConnectionWriteException.class) + public void testSer() { + Config config = new Config(); + config.useSingleServer().setAddress("127.0.0.1:6379"); + config.setCodec(new SerializationCodec()); + Redisson r = Redisson.create(config); + r.getMap("test").put("1", new Dummy()); + } + + +// @Test + public void test() { + NodesGroup nodes = redisson.getNodesGroup(); + Assert.assertEquals(1, nodes.getNodes().size()); + Iterator iter = nodes.getNodes().iterator(); + + Node node1 = iter.next(); + Assert.assertTrue(node1.ping()); + + Assert.assertTrue(nodes.pingAll()); + } + +// @Test + public void testSentinel() { + NodesGroup nodes = redisson.getNodesGroup(); + Assert.assertEquals(5, nodes.getNodes().size()); + + for (Node node : nodes.getNodes()) { + Assert.assertTrue(node.ping()); + } + + Assert.assertTrue(nodes.pingAll()); + } + + @Test + public void testCluster() { + NodesGroup nodes = redisson.getClusterNodesGroup(); + Assert.assertEquals(2, nodes.getNodes().size()); + + for (ClusterNode node : nodes.getNodes()) { + Map params = node.info(); + Assert.assertNotNull(params); + Assert.assertTrue(node.ping()); + } + + Assert.assertTrue(nodes.pingAll()); + } + +}