RedisClient pipeline support. #42

pull/243/head
Nikita 10 years ago
parent c751f88148
commit dd5bf7c353

@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException;
import org.redisson.client.handler.CommandDecoder; import org.redisson.client.handler.CommandDecoder;
import org.redisson.client.handler.CommandEncoder; import org.redisson.client.handler.CommandEncoder;
import org.redisson.client.handler.CommandsListEncoder;
import org.redisson.client.handler.CommandsQueue; import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.handler.ConnectionWatchdog; import org.redisson.client.handler.ConnectionWatchdog;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -59,6 +60,7 @@ public class RedisClient {
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels), ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
new CommandEncoder(), new CommandEncoder(),
new CommandsListEncoder(),
new CommandsQueue(), new CommandsQueue(),
new CommandDecoder()); new CommandDecoder());
} }
@ -76,7 +78,7 @@ public class RedisClient {
return timeout; return timeout;
} }
Bootstrap getBootstrap() { public Bootstrap getBootstrap() {
return bootstrap; return bootstrap;
} }

@ -15,10 +15,12 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.handler.CommandData;
import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
@ -79,6 +81,10 @@ public class RedisConnection implements RedisCommands {
channel.writeAndFlush(data); channel.writeAndFlush(data);
} }
public void send(List<CommandData<? extends Object, ? extends Object>> data) {
channel.writeAndFlush(new CommandsData(data));
}
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params); Future<R> r = async(encoder, command, params);
return await(r); return await(r);
@ -90,6 +96,11 @@ public class RedisConnection implements RedisCommands {
return promise; return promise;
} }
public <T, R> CommandData<T, R> create(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
return new CommandData<T, R>(promise, encoder, command, params);
}
public void setClosed(boolean reconnect) { public void setClosed(boolean reconnect) {
this.closed = reconnect; this.closed = reconnect;
} }

@ -18,8 +18,8 @@ package org.redisson.client;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.CommandData;
import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;

@ -26,7 +26,10 @@ import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.CommandsQueue.QueueCommands; import org.redisson.client.handler.CommandsQueue.QueueCommands;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubMessage;
@ -59,7 +62,7 @@ public class CommandDecoder extends ReplayingDecoder<Void> {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
CommandData<Object, Object> data = ctx.channel().attr(CommandsQueue.REPLAY).get(); QueueCommand data = ctx.channel().attr(CommandsQueue.REPLAY).get();
Decoder<Object> currentDecoder = null; Decoder<Object> currentDecoder = null;
if (data == null) { if (data == null) {
@ -75,10 +78,26 @@ public class CommandDecoder extends ReplayingDecoder<Void> {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
} }
try { if (data instanceof CommandData) {
decode(in, data, null, ctx.channel(), currentDecoder); CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
} catch (IOException e) { try {
data.getPromise().setFailure(e); decode(in, cmd, null, ctx.channel(), currentDecoder);
} catch (IOException e) {
cmd.getPromise().setFailure(e);
}
} else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data;
int i = 0;
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null;
try {
cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
decode(in, cmd, null, ctx.channel(), currentDecoder);
i++;
} catch (IOException e) {
cmd.getPromise().setFailure(e);
}
}
} }
ctx.channel().attr(CommandsQueue.REPLAY).remove(); ctx.channel().attr(CommandsQueue.REPLAY).remove();

@ -15,6 +15,7 @@
*/ */
package org.redisson.client.handler; package org.redisson.client.handler;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.StringParamsEncoder; import org.redisson.client.protocol.StringParamsEncoder;
import org.slf4j.Logger; import org.slf4j.Logger;

@ -0,0 +1,41 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
*
* @author Nikita Koksharov
*
*/
public class CommandsListEncoder extends MessageToByteEncoder<CommandsData> {
private final CommandEncoder encoder = new CommandEncoder();
@Override
protected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception {
for (CommandData<?, ?> commandData : msg.getCommands()) {
encoder.encode(ctx, (CommandData<Object, Object>)commandData, out);
}
}
}

@ -17,19 +17,27 @@ package org.redisson.client.handler;
import java.util.Queue; import java.util.Queue;
import org.redisson.client.protocol.QueueCommand;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/**
*
*
* @author Nikita Koksharov
*
*/
public class CommandsQueue extends ChannelDuplexHandler { public class CommandsQueue extends ChannelDuplexHandler {
public enum QueueCommands {NEXT_COMMAND} public enum QueueCommands {NEXT_COMMAND}
public static final AttributeKey<CommandData<Object, Object>> REPLAY = AttributeKey.valueOf("promise"); public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise");
private final Queue<CommandData<Object, Object>> queue = PlatformDependent.newMpscQueue(); private final Queue<QueueCommand> queue = PlatformDependent.newMpscQueue();
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@ -43,8 +51,8 @@ public class CommandsQueue extends ChannelDuplexHandler {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof CommandData) { if (msg instanceof QueueCommand) {
CommandData<Object, Object> data = (CommandData<Object, Object>) msg; QueueCommand data = (QueueCommand) msg;
if (data.getSended().get()) { if (data.getSended().get()) {
super.write(ctx, msg, promise); super.write(ctx, msg, promise);
} else { } else {
@ -57,7 +65,7 @@ public class CommandsQueue extends ChannelDuplexHandler {
} }
private void sendData(ChannelHandlerContext ctx) throws Exception { private void sendData(ChannelHandlerContext ctx) throws Exception {
CommandData<Object, Object> data = queue.peek(); QueueCommand data = queue.peek();
if (data != null && data.getSended().compareAndSet(false, true)) { if (data != null && data.getSended().compareAndSet(false, true)) {
ctx.channel().attr(REPLAY).set(data); ctx.channel().attr(REPLAY).set(data);
ctx.channel().writeAndFlush(data); ctx.channel().writeAndFlush(data);

@ -13,17 +13,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.client.handler; package org.redisson.client.protocol;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
public class CommandData<T, R> { public class CommandData<T, R> implements QueueCommand {
final Promise<R> promise; final Promise<R> promise;
final RedisCommand<T> command; final RedisCommand<T> command;

@ -0,0 +1,39 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class CommandsData implements QueueCommand {
private final List<CommandData<?, ?>> commands;
private final AtomicBoolean sended = new AtomicBoolean();
public CommandsData(List<CommandData<?, ?>> commands) {
super();
this.commands = commands;
}
public List<CommandData<? extends Object, ? extends Object>> getCommands() {
return commands;
}
public AtomicBoolean getSended() {
return sended;
}
}

@ -0,0 +1,24 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import java.util.concurrent.atomic.AtomicBoolean;
public interface QueueCommand {
AtomicBoolean getSended();
}

@ -41,6 +41,8 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands { public interface RedisCommands {
RedisStrictCommand<String> PING = new RedisStrictCommand<String>("PING");
RedisStrictCommand<Boolean> UNWATCH = new RedisStrictCommand<Boolean>("UNWATCH", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> UNWATCH = new RedisStrictCommand<Boolean>("UNWATCH", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> WATCH = new RedisStrictCommand<Boolean>("WATCH", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> WATCH = new RedisStrictCommand<Boolean>("WATCH", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> MULTI = new RedisStrictCommand<Boolean>("MULTI", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> MULTI = new RedisStrictCommand<Boolean>("MULTI", new BooleanReplayConvertor());

@ -43,8 +43,8 @@ import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTimeoutException;
import org.redisson.client.handler.CommandData;
import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.StringCodec; import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;

@ -0,0 +1,71 @@
package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import io.netty.util.concurrent.Promise;
public class RedisClientTest {
@Test
public void test() throws InterruptedException {
RedisClient c = new RedisClient("localhost", 6379);
final RedisConnection conn = c.connect();
conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0);
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
for (int i = 0; i < 100000; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
conn.async(StringCodec.INSTANCE, RedisCommands.INCR, "test");
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test"));
}
@Test
public void testPipeline() throws InterruptedException, ExecutionException {
RedisClient c = new RedisClient("localhost", 6379);
RedisConnection conn = c.connect();
conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0);
List<CommandData<?, ?>> commands = new ArrayList<CommandData<?, ?>>();
CommandData<String, String> cmd1 = conn.create(null, RedisCommands.PING);
commands.add(cmd1);
CommandData<Long, Long> cmd2 = conn.create(null, RedisCommands.INCR, "test");
commands.add(cmd2);
CommandData<Long, Long> cmd3 = conn.create(null, RedisCommands.INCR, "test");
commands.add(cmd3);
CommandData<String, String> cmd4 = conn.create(null, RedisCommands.PING);
commands.add(cmd4);
conn.send(commands);
Assert.assertEquals("PONG", cmd1.getPromise().get());
Assert.assertEquals(1, (long)cmd2.getPromise().get());
Assert.assertEquals(2, (long)cmd3.getPromise().get());
Assert.assertEquals("PONG", cmd4.getPromise().get());
}
}
Loading…
Cancel
Save