diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 70fb61b3b..67e664358 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -16,7 +16,6 @@ package org.redisson.client; import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisDecoder; @@ -30,17 +29,21 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; public class RedisClient { - private Bootstrap bootstrap; - private InetSocketAddress addr; + private final Bootstrap bootstrap; + private final InetSocketAddress addr; + private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - private long timeout; - private TimeUnit timeoutUnit; + private final long timeout; public RedisClient(String host, int port) { this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000); @@ -61,31 +64,18 @@ public class RedisClient { }); - setTimeout(timeout, TimeUnit.MILLISECONDS); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); + this.timeout = timeout; } - /** - * Set the default timeout for {@link RedisConnection connections} created by - * this client. The timeout applies to connection attempts and non-blocking - * commands. - * - * @param timeout Сonnection timeout. - * @param unit Unit of time for the timeout. - */ - public void setTimeout(long timeout, TimeUnit unit) { - this.timeout = timeout; - this.timeoutUnit = unit; - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout)); + public InetSocketAddress getAddr() { + return addr; } long getTimeout() { return timeout; } - TimeUnit getTimeoutUnit() { - return timeoutUnit; - } - Bootstrap getBootstrap() { return bootstrap; } @@ -93,17 +83,25 @@ public class RedisClient { public RedisConnection connect() { ChannelFuture future = bootstrap.connect(); future.syncUninterruptibly(); + channels.add(future.channel()); return new RedisConnection(this, future.channel()); } + public ChannelGroupFuture shutdownAsync() { + return channels.close(); + } + public static void main(String[] args) throws InterruptedException { final RedisClient c = new RedisClient("127.0.0.1", 6379); RedisConnection rc = c.connect(); // for (int i = 0; i < 10000; i++) { - String res1 = rc.sync(new StringCodec(), RedisCommands.CLIENT_SETNAME, "12333"); + String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333"); System.out.println("res 12: " + res1); - String res2 = rc.sync(new StringCodec(), RedisCommands.CLIENT_GETNAME); + String res2 = rc.sync(RedisCommands.CLIENT_GETNAME); System.out.println("res name: " + res2); + Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33"); + System.out.println("res name 2: " + res3); + /* Future res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); res.addListener(new FutureListener() { diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 1a2f07fc5..97d006ee5 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -1,14 +1,19 @@ package org.redisson.client; +import java.util.concurrent.TimeUnit; + import org.redisson.client.handler.RedisData; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStringCommand; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -public class RedisConnection { +public class RedisConnection implements RedisCommands { final Channel channel; final RedisClient redisClient; @@ -19,8 +24,12 @@ public class RedisConnection { this.channel = channel; } + public RedisClient getRedisClient() { + return redisClient; + } + public R await(Future cmd) { - if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), redisClient.getTimeoutUnit())) { + if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { Promise promise = (Promise)cmd; RedisTimeoutException ex = new RedisTimeoutException(); promise.setFailure(ex); @@ -47,6 +56,21 @@ public class RedisConnection { throw new RedisException("Unexpected exception while processing command", future.cause()); } + public String sync(RedisStringCommand command, Object ... params) { + Future r = async(command, params); + return await(r); + } + + public Future async(RedisStringCommand command, Object ... params) { + Promise promise = redisClient.getBootstrap().group().next().newPromise(); + channel.writeAndFlush(new RedisData(promise, command.getCodec(), command, params)); + return promise; + } + + public void send(RedisData data) { + channel.writeAndFlush(data); + } + public R sync(Codec encoder, RedisCommand command, Object ... params) { Future r = async(encoder, command, params); return await(r); @@ -58,6 +82,10 @@ public class RedisConnection { return promise; } + public ChannelFuture closeAsync() { + return channel.close(); + } + // public Future execute(Codec encoder, RedisCommand command, Object ... params) { // Promise promise = bootstrap.group().next().newPromise(); // channel.writeAndFlush(new RedisData(promise, encoder, command, params)); diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index 4d64cd23e..5b822df7d 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -43,6 +43,9 @@ public class RedisDecoder extends ReplayingDecoder { } else if (code == '-') { Object result = data.getCommand().getReponseDecoder().decode(in); data.getPromise().setFailure(new RedisException(result.toString())); + } else if (code == ':') { + Object result = data.getCommand().getReponseDecoder().decode(in); + data.getPromise().setSuccess(result); } else if (code == '$') { Decoder decoder = data.getCommand().getReponseDecoder(); if (decoder == null) { @@ -51,7 +54,7 @@ public class RedisDecoder extends ReplayingDecoder { Object result = decoder.decode(readBytes(in)); data.getPromise().setSuccess(result); } else { - throw new IllegalStateException("Can't decode replay"); + throw new IllegalStateException("Can't decode replay " + (char)code); } ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); diff --git a/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java b/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java new file mode 100644 index 000000000..a0b7cb92b --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/BooleanReplayDecoder.java @@ -0,0 +1,30 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class BooleanReplayDecoder implements Decoder { + + @Override + public Boolean decode(ByteBuf buf) { + String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return Boolean.valueOf(status); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index eeb379a2f..a6c5d755a 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -15,14 +15,32 @@ */ package org.redisson.client.protocol; +import org.redisson.client.handler.RedisData; + +import io.netty.util.concurrent.Future; + public interface RedisCommands { - RedisCommand AUTH = new RedisCommand("AUTH", new StringReplayDecoder()); - RedisCommand SELECT = new RedisCommand("SELECT", new StringReplayDecoder()); - RedisCommand CLIENT_SETNAME = new RedisCommand("CLIENT", "SETNAME", new StringReplayDecoder(), 1); - RedisCommand CLIENT_GETNAME = new RedisCommand("CLIENT", "GETNAME"); + RedisStringCommand AUTH = new RedisStringCommand("AUTH", new StringReplayDecoder()); + RedisStringCommand SELECT = new RedisStringCommand("SELECT", new StringReplayDecoder()); + RedisStringCommand CLIENT_SETNAME = new RedisStringCommand("CLIENT", "SETNAME", new StringReplayDecoder(), 1); + RedisStringCommand CLIENT_GETNAME = new RedisStringCommand("CLIENT", "GETNAME"); RedisCommand GET = new RedisCommand("GET"); RedisCommand SET = new RedisCommand("SET", new StringReplayDecoder(), 1); + RedisCommand SETEX = new RedisCommand("SETEX", new StringReplayDecoder(), 2); + RedisCommand EXISTS = new RedisCommand("EXISTS", new BooleanReplayDecoder(), 1); + + + + String sync(RedisStringCommand command, Object ... params); + + Future async(RedisStringCommand command, Object ... params); + + R sync(Codec encoder, RedisCommand command, Object ... params); + + Future async(Codec encoder, RedisCommand command, Object ... params); + + void send(RedisData data); } diff --git a/src/main/java/org/redisson/client/protocol/RedisStringCommand.java b/src/main/java/org/redisson/client/protocol/RedisStringCommand.java new file mode 100644 index 000000000..bb0b2cc43 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/RedisStringCommand.java @@ -0,0 +1,28 @@ +package org.redisson.client.protocol; + +public class RedisStringCommand extends RedisCommand { + + private Codec codec = new StringCodec(); + + public RedisStringCommand(String name, int... encodeParamIndexes) { + super(name, encodeParamIndexes); + } + + public RedisStringCommand(String name, String subName, Decoder reponseDecoder, + int... encodeParamIndexes) { + super(name, subName, reponseDecoder, encodeParamIndexes); + } + + public RedisStringCommand(String name, String subName, int... encodeParamIndexes) { + super(name, subName, encodeParamIndexes); + } + + public RedisStringCommand(String name, Decoder reponseDecoder, int... encodeParamIndexes) { + super(name, reponseDecoder, encodeParamIndexes); + } + + public Codec getCodec() { + return codec; + } + +}