CommandsQueue object allocations optimization. #338

pull/365/head
Nikita 9 years ago
parent ca6be04b0e
commit 7a64333f0f

@ -118,7 +118,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return; return;
} }
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());
state(null); state(null);
} }
@ -146,7 +146,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
} }
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());
state(null); state(null);
} else { } else {

@ -22,6 +22,7 @@ import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder; import org.redisson.client.protocol.QueueCommandHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -42,10 +43,19 @@ public class CommandsQueue extends ChannelDuplexHandler {
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue(); private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
public void sendNextCommand(ChannelHandlerContext ctx) { private final ChannelFutureListener listener = new ChannelFutureListener() {
ctx.channel().attr(CommandsQueue.REPLAY).remove(); @Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
sendNextCommand(future.channel());
}
}
};
public void sendNextCommand(Channel channel) {
channel.attr(CommandsQueue.REPLAY).remove();
queue.poll(); queue.poll();
sendData(ctx); sendData(channel);
} }
@Override @Override
@ -57,14 +67,14 @@ public class CommandsQueue extends ChannelDuplexHandler {
super.write(ctx, msg, promise); super.write(ctx, msg, promise);
} else { } else {
queue.add(new QueueCommandHolder(data, promise)); queue.add(new QueueCommandHolder(data, promise));
sendData(ctx); sendData(ctx.channel());
} }
} else { } else {
super.write(ctx, msg, promise); super.write(ctx, msg, promise);
} }
} }
private void sendData(final ChannelHandlerContext ctx) { private void sendData(final Channel ch) {
QueueCommandHolder command = queue.peek(); QueueCommandHolder command = queue.peek();
if (command != null && command.getSended().compareAndSet(false, true)) { if (command != null && command.getSended().compareAndSet(false, true)) {
QueueCommand data = command.getCommand(); QueueCommand data = command.getCommand();
@ -72,21 +82,15 @@ public class CommandsQueue extends ChannelDuplexHandler {
if (!pubSubOps.isEmpty()) { if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) { for (CommandData<Object, Object> cd : pubSubOps) {
for (Object channel : cd.getParams()) { for (Object channel : cd.getParams()) {
ctx.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd); ch.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd);
} }
} }
} else { } else {
ctx.channel().attr(REPLAY).set(data); ch.attr(REPLAY).set(data);
} }
command.getChannelPromise().addListener(new ChannelFutureListener() {
@Override command.getChannelPromise().addListener(listener);
public void operationComplete(ChannelFuture future) throws Exception { ch.writeAndFlush(data, command.getChannelPromise());
if (!future.isSuccess()) {
sendNextCommand(ctx);
}
}
});
ctx.channel().writeAndFlush(data, command.getChannelPromise());
} }
} }

Loading…
Cancel
Save