From 7a64333f0f209e3e3709aec3886cfe3db27f22d6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 18 Dec 2015 16:40:33 +0300 Subject: [PATCH] CommandsQueue object allocations optimization. #338 --- .../client/handler/CommandDecoder.java | 4 +-- .../client/handler/CommandsQueue.java | 36 ++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index d05c9ff27..2330c6aef 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -118,7 +118,7 @@ public class CommandDecoder extends ReplayingDecoder { return; } - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); state(null); } @@ -146,7 +146,7 @@ public class CommandDecoder extends ReplayingDecoder { log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); } - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); state(null); } else { diff --git a/src/main/java/org/redisson/client/handler/CommandsQueue.java b/src/main/java/org/redisson/client/handler/CommandsQueue.java index 91d5cfdee..22f3aee0a 100644 --- a/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -22,6 +22,7 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommandHolder; +import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -42,10 +43,19 @@ public class CommandsQueue extends ChannelDuplexHandler { private final Queue queue = PlatformDependent.newMpscQueue(); - public void sendNextCommand(ChannelHandlerContext ctx) { - ctx.channel().attr(CommandsQueue.REPLAY).remove(); + private final ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + sendNextCommand(future.channel()); + } + } + }; + + public void sendNextCommand(Channel channel) { + channel.attr(CommandsQueue.REPLAY).remove(); queue.poll(); - sendData(ctx); + sendData(channel); } @Override @@ -57,14 +67,14 @@ public class CommandsQueue extends ChannelDuplexHandler { super.write(ctx, msg, promise); } else { queue.add(new QueueCommandHolder(data, promise)); - sendData(ctx); + sendData(ctx.channel()); } } else { super.write(ctx, msg, promise); } } - private void sendData(final ChannelHandlerContext ctx) { + private void sendData(final Channel ch) { QueueCommandHolder command = queue.peek(); if (command != null && command.getSended().compareAndSet(false, true)) { QueueCommand data = command.getCommand(); @@ -72,21 +82,15 @@ public class CommandsQueue extends ChannelDuplexHandler { if (!pubSubOps.isEmpty()) { for (CommandData cd : pubSubOps) { for (Object channel : cd.getParams()) { - ctx.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd); + ch.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd); } } } else { - ctx.channel().attr(REPLAY).set(data); + ch.attr(REPLAY).set(data); } - command.getChannelPromise().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - sendNextCommand(ctx); - } - } - }); - ctx.channel().writeAndFlush(data, command.getChannelPromise()); + + command.getChannelPromise().addListener(listener); + ch.writeAndFlush(data, command.getChannelPromise()); } }