PubSub message decoding under heavy load fixed. #276

pull/282/head
Nikita 9 years ago
parent 2eeb592a3d
commit 7ddf605385

@ -95,7 +95,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().setDecoderState(null); state().setDecoderState(null);
if (data == null) { if (data == null) {
decode(in, null, null, ctx.channel(), currentDecoder); decode(in, null, null, ctx.channel(), currentDecoder);
} else if (data instanceof CommandData) { } else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try { try {
@ -110,40 +110,45 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} else if (data instanceof CommandsData) { } else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data; CommandsData commands = (CommandsData)data;
int i = state().getIndex(); handleCommandsDataResponse(ctx, in, data, currentDecoder, commands);
return;
}
while (in.writerIndex() > in.readerIndex()) { ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
CommandData<Object, Object> cmd = null;
try {
checkpoint();
state().setIndex(i);
cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
decode(in, cmd, null, ctx.channel(), currentDecoder);
i++;
} catch (IOException e) {
cmd.getPromise().setFailure(e);
}
}
if (i == commands.getCommands().size()) { state(null);
Promise<Void> promise = commands.getPromise(); }
if (!promise.trySuccess(null)) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
Decoder<Object> currentDecoder, CommandsData commands) throws Exception {
int i = state().getIndex();
state(null); while (in.writerIndex() > in.readerIndex()) {
} else { CommandData<Object, Object> cmd = null;
try {
checkpoint(); checkpoint();
state().setIndex(i); state().setIndex(i);
cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
decode(in, cmd, null, ctx.channel(), currentDecoder);
i++;
} catch (IOException e) {
cmd.getPromise().setFailure(e);
} }
return;
} }
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); if (i == commands.getCommands().size()) {
Promise<Void> promise = commands.getPromise();
if (!promise.trySuccess(null)) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
state(null); ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
state(null);
} else {
checkpoint();
state().setIndex(i);
}
} }
private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException { private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException {
@ -206,6 +211,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Object result = decoder.decode(respParts, state()); Object result = decoder.decode(respParts, state());
// store current message index
checkpoint();
if (result instanceof Message) { if (result instanceof Message) {
handleMultiResult(data, null, channel, result); handleMultiResult(data, null, channel, result);
// has next messages? // has next messages?

@ -203,6 +203,8 @@ public class RedissonTopicTest {
redisson2.shutdown(); redisson2.shutdown();
} }
volatile long counter;
@Test @Test
public void testHeavyLoad() throws InterruptedException { public void testHeavyLoad() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1000); final CountDownLatch messageRecieved = new CountDownLatch(1000);
@ -214,6 +216,7 @@ public class RedissonTopicTest {
public void onMessage(String channel, Message msg) { public void onMessage(String channel, Message msg) {
Assert.assertEquals(new Message("123"), msg); Assert.assertEquals(new Message("123"), msg);
messageRecieved.countDown(); messageRecieved.countDown();
counter++;
} }
}); });
@ -233,11 +236,13 @@ public class RedissonTopicTest {
messageRecieved.await(); messageRecieved.await();
Thread.sleep(1000);
Assert.assertEquals(500, counter);
redisson1.shutdown(); redisson1.shutdown();
redisson2.shutdown(); redisson2.shutdown();
} }
@Test @Test
public void testListenerRemove() throws InterruptedException { public void testListenerRemove() throws InterruptedException {
Redisson redisson1 = BaseTest.createInstance(); Redisson redisson1 = BaseTest.createInstance();

Loading…
Cancel
Save