From dd5bf7c353d8f9d6e673996ff251478ef0fa3e34 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 17 Jul 2015 17:20:54 +0300 Subject: [PATCH] RedisClient pipeline support. #42 --- .../java/org/redisson/client/RedisClient.java | 4 +- .../org/redisson/client/RedisConnection.java | 13 +++- .../client/RedisPubSubConnection.java | 2 +- .../client/handler/CommandDecoder.java | 29 ++++++-- .../client/handler/CommandEncoder.java | 1 + .../client/handler/CommandsListEncoder.java | 41 +++++++++++ .../client/handler/CommandsQueue.java | 18 +++-- .../{handler => protocol}/CommandData.java | 6 +- .../client/protocol/CommandsData.java | 39 ++++++++++ .../client/protocol/QueueCommand.java | 24 +++++++ .../client/protocol/RedisCommands.java | 2 + .../MasterSlaveConnectionManager.java | 2 +- .../java/org/redisson/RedisClientTest.java | 71 +++++++++++++++++++ 13 files changed, 234 insertions(+), 18 deletions(-) create mode 100644 src/main/java/org/redisson/client/handler/CommandsListEncoder.java rename src/main/java/org/redisson/client/{handler => protocol}/CommandData.java (92%) create mode 100644 src/main/java/org/redisson/client/protocol/CommandsData.java create mode 100644 src/main/java/org/redisson/client/protocol/QueueCommand.java create mode 100644 src/test/java/org/redisson/RedisClientTest.java diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index bf32fb690..c15c89b7b 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException; import org.redisson.client.handler.CommandDecoder; import org.redisson.client.handler.CommandEncoder; +import org.redisson.client.handler.CommandsListEncoder; import org.redisson.client.handler.CommandsQueue; import org.redisson.client.handler.ConnectionWatchdog; import org.redisson.client.protocol.RedisCommands; @@ -59,6 +60,7 @@ public class RedisClient { protected void initChannel(Channel ch) throws Exception { ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels), new CommandEncoder(), + new CommandsListEncoder(), new CommandsQueue(), new CommandDecoder()); } @@ -76,7 +78,7 @@ public class RedisClient { return timeout; } - Bootstrap getBootstrap() { + public Bootstrap getBootstrap() { return bootstrap; } diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 204e0d588..6917f7715 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -15,10 +15,12 @@ */ package org.redisson.client; +import java.util.List; import java.util.concurrent.TimeUnit; -import org.redisson.client.handler.CommandData; import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; @@ -79,6 +81,10 @@ public class RedisConnection implements RedisCommands { channel.writeAndFlush(data); } + public void send(List> data) { + channel.writeAndFlush(new CommandsData(data)); + } + public R sync(Codec encoder, RedisCommand command, Object ... params) { Future r = async(encoder, command, params); return await(r); @@ -90,6 +96,11 @@ public class RedisConnection implements RedisCommands { return promise; } + public CommandData create(Codec encoder, RedisCommand command, Object ... params) { + Promise promise = redisClient.getBootstrap().group().next().newPromise(); + return new CommandData(promise, encoder, command, params); + } + public void setClosed(boolean reconnect) { this.closed = reconnect; } diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index 8d117d7e5..ddfbccdbb 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -18,8 +18,8 @@ package org.redisson.client; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import org.redisson.client.handler.CommandData; import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MultiDecoder; diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 42e9c5b1e..1e7730e40 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -26,7 +26,10 @@ import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.handler.CommandsQueue.QueueCommands; +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.pubsub.PubSubMessage; @@ -59,7 +62,7 @@ public class CommandDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - CommandData data = ctx.channel().attr(CommandsQueue.REPLAY).get(); + QueueCommand data = ctx.channel().attr(CommandsQueue.REPLAY).get(); Decoder currentDecoder = null; if (data == null) { @@ -75,10 +78,26 @@ public class CommandDecoder extends ReplayingDecoder { log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); } - try { - decode(in, data, null, ctx.channel(), currentDecoder); - } catch (IOException e) { - data.getPromise().setFailure(e); + if (data instanceof CommandData) { + CommandData cmd = (CommandData)data; + try { + decode(in, cmd, null, ctx.channel(), currentDecoder); + } catch (IOException e) { + cmd.getPromise().setFailure(e); + } + } else if (data instanceof CommandsData) { + CommandsData commands = (CommandsData)data; + int i = 0; + while (in.writerIndex() > in.readerIndex()) { + CommandData cmd = null; + try { + cmd = (CommandData) commands.getCommands().get(i); + decode(in, cmd, null, ctx.channel(), currentDecoder); + i++; + } catch (IOException e) { + cmd.getPromise().setFailure(e); + } + } } ctx.channel().attr(CommandsQueue.REPLAY).remove(); diff --git a/src/main/java/org/redisson/client/handler/CommandEncoder.java b/src/main/java/org/redisson/client/handler/CommandEncoder.java index ab3a831db..88144b026 100644 --- a/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -15,6 +15,7 @@ */ package org.redisson.client.handler; +import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.StringParamsEncoder; import org.slf4j.Logger; diff --git a/src/main/java/org/redisson/client/handler/CommandsListEncoder.java b/src/main/java/org/redisson/client/handler/CommandsListEncoder.java new file mode 100644 index 000000000..d49b4a2d1 --- /dev/null +++ b/src/main/java/org/redisson/client/handler/CommandsListEncoder.java @@ -0,0 +1,41 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.handler; + +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.CommandsData; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class CommandsListEncoder extends MessageToByteEncoder { + + private final CommandEncoder encoder = new CommandEncoder(); + + @Override + protected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception { + for (CommandData commandData : msg.getCommands()) { + encoder.encode(ctx, (CommandData)commandData, out); + } + } + +} diff --git a/src/main/java/org/redisson/client/handler/CommandsQueue.java b/src/main/java/org/redisson/client/handler/CommandsQueue.java index b12d86bee..61d6ea60f 100644 --- a/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -17,19 +17,27 @@ package org.redisson.client.handler; import java.util.Queue; +import org.redisson.client.protocol.QueueCommand; + import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.AttributeKey; import io.netty.util.internal.PlatformDependent; +/** + * + * + * @author Nikita Koksharov + * + */ public class CommandsQueue extends ChannelDuplexHandler { public enum QueueCommands {NEXT_COMMAND} - public static final AttributeKey> REPLAY = AttributeKey.valueOf("promise"); + public static final AttributeKey REPLAY = AttributeKey.valueOf("promise"); - private final Queue> queue = PlatformDependent.newMpscQueue(); + private final Queue queue = PlatformDependent.newMpscQueue(); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -43,8 +51,8 @@ public class CommandsQueue extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof CommandData) { - CommandData data = (CommandData) msg; + if (msg instanceof QueueCommand) { + QueueCommand data = (QueueCommand) msg; if (data.getSended().get()) { super.write(ctx, msg, promise); } else { @@ -57,7 +65,7 @@ public class CommandsQueue extends ChannelDuplexHandler { } private void sendData(ChannelHandlerContext ctx) throws Exception { - CommandData data = queue.peek(); + QueueCommand data = queue.peek(); if (data != null && data.getSended().compareAndSet(false, true)) { ctx.channel().attr(REPLAY).set(data); ctx.channel().writeAndFlush(data); diff --git a/src/main/java/org/redisson/client/handler/CommandData.java b/src/main/java/org/redisson/client/protocol/CommandData.java similarity index 92% rename from src/main/java/org/redisson/client/handler/CommandData.java rename to src/main/java/org/redisson/client/protocol/CommandData.java index 5cd18c70c..685b679a5 100644 --- a/src/main/java/org/redisson/client/handler/CommandData.java +++ b/src/main/java/org/redisson/client/protocol/CommandData.java @@ -13,17 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.client.handler; +package org.redisson.client.protocol; import java.util.concurrent.atomic.AtomicBoolean; -import org.redisson.client.protocol.Codec; -import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.decoder.MultiDecoder; import io.netty.util.concurrent.Promise; -public class CommandData { +public class CommandData implements QueueCommand { final Promise promise; final RedisCommand command; diff --git a/src/main/java/org/redisson/client/protocol/CommandsData.java b/src/main/java/org/redisson/client/protocol/CommandsData.java new file mode 100644 index 000000000..be2a5b15a --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class CommandsData implements QueueCommand { + + private final List> commands; + private final AtomicBoolean sended = new AtomicBoolean(); + + public CommandsData(List> commands) { + super(); + this.commands = commands; + } + + public List> getCommands() { + return commands; + } + + public AtomicBoolean getSended() { + return sended; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/QueueCommand.java b/src/main/java/org/redisson/client/protocol/QueueCommand.java new file mode 100644 index 000000000..3e8847a48 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/QueueCommand.java @@ -0,0 +1,24 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import java.util.concurrent.atomic.AtomicBoolean; + +public interface QueueCommand { + + AtomicBoolean getSended(); + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 82937c232..b43b5ac4d 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -41,6 +41,8 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; public interface RedisCommands { + RedisStrictCommand PING = new RedisStrictCommand("PING"); + RedisStrictCommand UNWATCH = new RedisStrictCommand("UNWATCH", new BooleanReplayConvertor()); RedisStrictCommand WATCH = new RedisStrictCommand("WATCH", new BooleanReplayConvertor()); RedisStrictCommand MULTI = new RedisStrictCommand("MULTI", new BooleanReplayConvertor()); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index b6053a66d..86730d235 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -43,8 +43,8 @@ import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; -import org.redisson.client.handler.CommandData; import org.redisson.client.protocol.Codec; +import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.StringCodec; import org.redisson.client.protocol.decoder.MultiDecoder; diff --git a/src/test/java/org/redisson/RedisClientTest.java b/src/test/java/org/redisson/RedisClientTest.java new file mode 100644 index 000000000..9f98e5ea0 --- /dev/null +++ b/src/test/java/org/redisson/RedisClientTest.java @@ -0,0 +1,71 @@ +package org.redisson; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.client.RedisClient; +import org.redisson.client.RedisConnection; +import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.LongCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.StringCodec; + +import io.netty.util.concurrent.Promise; + +public class RedisClientTest { + + @Test + public void test() throws InterruptedException { + RedisClient c = new RedisClient("localhost", 6379); + final RedisConnection conn = c.connect(); + + conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0); + ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); + for (int i = 0; i < 100000; i++) { + pool.execute(new Runnable() { + @Override + public void run() { + conn.async(StringCodec.INSTANCE, RedisCommands.INCR, "test"); + } + }); + } + + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.HOURS); + + Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test")); + } + + @Test + public void testPipeline() throws InterruptedException, ExecutionException { + RedisClient c = new RedisClient("localhost", 6379); + RedisConnection conn = c.connect(); + + conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0); + + List> commands = new ArrayList>(); + CommandData cmd1 = conn.create(null, RedisCommands.PING); + commands.add(cmd1); + CommandData cmd2 = conn.create(null, RedisCommands.INCR, "test"); + commands.add(cmd2); + CommandData cmd3 = conn.create(null, RedisCommands.INCR, "test"); + commands.add(cmd3); + CommandData cmd4 = conn.create(null, RedisCommands.PING); + commands.add(cmd4); + + conn.send(commands); + + Assert.assertEquals("PONG", cmd1.getPromise().get()); + Assert.assertEquals(1, (long)cmd2.getPromise().get()); + Assert.assertEquals(2, (long)cmd3.getPromise().get()); + Assert.assertEquals("PONG", cmd4.getPromise().get()); + } + + +}