Command encoding errors handling bug fixed. #216

pull/218/head
Nikita 10 years ago
parent 252493e56b
commit 2ac65d2fc2

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisConnectionWriteException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
@ -212,7 +212,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed"));
ex.set(new RedisConnectionWriteException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisConnectionWriteException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
@ -422,7 +422,8 @@ public class CommandExecutorService implements CommandExecutor {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed"));
ex.set(new RedisConnectionWriteException(
"Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}

@ -15,15 +15,15 @@
*/
package org.redisson.client;
public class RedisConnectionClosedException extends RedisException {
public class RedisConnectionWriteException extends RedisException {
private static final long serialVersionUID = -4756928186967834601L;
public RedisConnectionClosedException(String msg) {
public RedisConnectionWriteException(String msg) {
super(msg);
}
public RedisConnectionClosedException(String msg, Throwable e) {
public RedisConnectionWriteException(String msg, Throwable e) {
super(msg, e);
}

@ -21,12 +21,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.CommandsQueue.QueueCommands;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder;
@ -132,8 +130,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
ctx.channel().attr(CommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
state(null);
} else {
@ -143,8 +140,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return;
}
ctx.channel().attr(CommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
state(null);
}

@ -23,6 +23,8 @@ import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
@ -36,20 +38,14 @@ import io.netty.util.internal.PlatformDependent;
*/
public class CommandsQueue extends ChannelDuplexHandler {
public enum QueueCommands {NEXT_COMMAND}
public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise");
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == QueueCommands.NEXT_COMMAND) {
queue.poll();
sendData(ctx);
} else {
super.userEventTriggered(ctx, evt);
}
public void sendNextCommand(ChannelHandlerContext ctx) throws Exception {
ctx.channel().attr(CommandsQueue.REPLAY).remove();
queue.poll();
sendData(ctx);
}
@Override
@ -67,7 +63,7 @@ public class CommandsQueue extends ChannelDuplexHandler {
}
}
private void sendData(ChannelHandlerContext ctx) throws Exception {
private void sendData(final ChannelHandlerContext ctx) throws Exception {
QueueCommandHolder command = queue.peek();
if (command != null && command.getSended().compareAndSet(false, true)) {
QueueCommand data = command.getCommand();
@ -81,6 +77,14 @@ public class CommandsQueue extends ChannelDuplexHandler {
} else {
ctx.channel().attr(REPLAY).set(data);
}
command.getChannelPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
sendNextCommand(ctx);
}
}
});
ctx.channel().writeAndFlush(data, command.getChannelPromise());
}
}

@ -27,6 +27,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setRetryAttempts(cfg.getRetryAttempts());
newconfig.setRetryInterval(cfg.getRetryInterval());
newconfig.setTimeout(cfg.getTimeout());
newconfig.setPingTimeout(cfg.getPingTimeout());
newconfig.setPassword(cfg.getPassword());
newconfig.setDatabase(cfg.getDatabase());
newconfig.setClientName(cfg.getClientName());

@ -0,0 +1,68 @@
package org.redisson;
import java.util.Iterator;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisConnectionWriteException;
import org.redisson.codec.SerializationCodec;
import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
public class RedissonTest extends BaseTest {
public static class Dummy {
private String field;
}
@Test(expected = RedisConnectionWriteException.class)
public void testSer() {
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
config.setCodec(new SerializationCodec());
Redisson r = Redisson.create(config);
r.getMap("test").put("1", new Dummy());
}
// @Test
public void test() {
NodesGroup<Node> nodes = redisson.getNodesGroup();
Assert.assertEquals(1, nodes.getNodes().size());
Iterator<Node> iter = nodes.getNodes().iterator();
Node node1 = iter.next();
Assert.assertTrue(node1.ping());
Assert.assertTrue(nodes.pingAll());
}
// @Test
public void testSentinel() {
NodesGroup<Node> nodes = redisson.getNodesGroup();
Assert.assertEquals(5, nodes.getNodes().size());
for (Node node : nodes.getNodes()) {
Assert.assertTrue(node.ping());
}
Assert.assertTrue(nodes.pingAll());
}
@Test
public void testCluster() {
NodesGroup<ClusterNode> nodes = redisson.getClusterNodesGroup();
Assert.assertEquals(2, nodes.getNodes().size());
for (ClusterNode node : nodes.getNodes()) {
Map<String, String> params = node.info();
Assert.assertNotNull(params);
Assert.assertTrue(node.ping());
}
Assert.assertTrue(nodes.pingAll());
}
}
Loading…
Cancel
Save