Fixed - CommandDecoder doesn't remove command with failed response from commands queue. #1377

pull/1423/head
Nikita 7 years ago
parent 4d9daa2e69
commit a42a20ba11

@ -98,6 +98,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
sendNext(ctx);
throw e;
}
} else if (data instanceof CommandData) {
@ -111,6 +112,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
cmd.tryFailure(e);
sendNext(ctx);
throw e;
}
} else if (data instanceof CommandsData) {
@ -119,13 +121,17 @@ public class CommandDecoder extends ReplayingDecoder<State> {
decodeCommandBatch(ctx, in, data, commands);
} catch (Exception e) {
commands.getPromise().tryFailure(e);
sendNext(ctx);
throw e;
}
return;
}
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());
sendNext(ctx);
}
protected void sendNext(ChannelHandlerContext ctx) {
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());
state(null);
}
@ -242,9 +248,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());
state(null);
sendNext(ctx);
} else {
checkpoint();
state().setBatchIndex(i);

@ -1,7 +1,7 @@
package org.redisson;
import static org.awaitility.Awaitility.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.redisson.BaseTest.createInstance;
import java.io.IOException;
@ -31,31 +31,44 @@ import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
import org.redisson.api.Node.InfoSection;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.HashValue;
import io.netty.buffer.Unpooled;
public class RedissonTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@Test
public void testDecoderError() {
redisson.getBucket("testbucket", new StringCodec()).set("{INVALID JSON!}");
for (int i = 0; i < 256; i++) {
try {
redisson.getBucket("testbucket", new JsonJacksonCodec()).get();
Assert.fail();
} catch (Exception e) {
// skip
}
}
redisson.getBucket("testbucket2").set("should work");
}
@Test
public void testSmallPool() throws InterruptedException {
Config config = new Config();

Loading…
Cancel
Save