new commands added

pull/243/head
Nikita 10 years ago
parent cf99a63779
commit 6c182ed77e

@ -16,7 +16,6 @@
package org.redisson.client; package org.redisson.client;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisDecoder; import org.redisson.client.handler.RedisDecoder;
@ -30,17 +29,21 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
public class RedisClient { public class RedisClient {
private Bootstrap bootstrap; private final Bootstrap bootstrap;
private InetSocketAddress addr; private final InetSocketAddress addr;
private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private long timeout; private final long timeout;
private TimeUnit timeoutUnit;
public RedisClient(String host, int port) { public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000); this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000);
@ -61,31 +64,18 @@ public class RedisClient {
}); });
setTimeout(timeout, TimeUnit.MILLISECONDS); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
this.timeout = timeout;
} }
/** public InetSocketAddress getAddr() {
* Set the default timeout for {@link RedisConnection connections} created by return addr;
* this client. The timeout applies to connection attempts and non-blocking
* commands.
*
* @param timeout Сonnection timeout.
* @param unit Unit of time for the timeout.
*/
public void setTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.timeoutUnit = unit;
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout));
} }
long getTimeout() { long getTimeout() {
return timeout; return timeout;
} }
TimeUnit getTimeoutUnit() {
return timeoutUnit;
}
Bootstrap getBootstrap() { Bootstrap getBootstrap() {
return bootstrap; return bootstrap;
} }
@ -93,17 +83,25 @@ public class RedisClient {
public RedisConnection connect() { public RedisConnection connect() {
ChannelFuture future = bootstrap.connect(); ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly(); future.syncUninterruptibly();
channels.add(future.channel());
return new RedisConnection(this, future.channel()); return new RedisConnection(this, future.channel());
} }
public ChannelGroupFuture shutdownAsync() {
return channels.close();
}
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
final RedisClient c = new RedisClient("127.0.0.1", 6379); final RedisClient c = new RedisClient("127.0.0.1", 6379);
RedisConnection rc = c.connect(); RedisConnection rc = c.connect();
// for (int i = 0; i < 10000; i++) { // for (int i = 0; i < 10000; i++) {
String res1 = rc.sync(new StringCodec(), RedisCommands.CLIENT_SETNAME, "12333"); String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1); System.out.println("res 12: " + res1);
String res2 = rc.sync(new StringCodec(), RedisCommands.CLIENT_GETNAME); String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
System.out.println("res name: " + res2); System.out.println("res name: " + res2);
Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
System.out.println("res name 2: " + res3);
/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); /* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());
res.addListener(new FutureListener<String>() { res.addListener(new FutureListener<String>() {

@ -1,14 +1,19 @@
package org.redisson.client; package org.redisson.client;
import java.util.concurrent.TimeUnit;
import org.redisson.client.handler.RedisData; import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStringCommand;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
public class RedisConnection { public class RedisConnection implements RedisCommands {
final Channel channel; final Channel channel;
final RedisClient redisClient; final RedisClient redisClient;
@ -19,8 +24,12 @@ public class RedisConnection {
this.channel = channel; this.channel = channel;
} }
public RedisClient getRedisClient() {
return redisClient;
}
public <R> R await(Future<R> cmd) { public <R> R await(Future<R> cmd) {
if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), redisClient.getTimeoutUnit())) { if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)cmd; Promise<R> promise = (Promise<R>)cmd;
RedisTimeoutException ex = new RedisTimeoutException(); RedisTimeoutException ex = new RedisTimeoutException();
promise.setFailure(ex); promise.setFailure(ex);
@ -47,6 +56,21 @@ public class RedisConnection {
throw new RedisException("Unexpected exception while processing command", future.cause()); throw new RedisException("Unexpected exception while processing command", future.cause());
} }
public String sync(RedisStringCommand command, Object ... params) {
Future<String> r = async(command, params);
return await(r);
}
public Future<String> async(RedisStringCommand command, Object ... params) {
Promise<String> promise = redisClient.getBootstrap().group().next().<String>newPromise();
channel.writeAndFlush(new RedisData<String, String>(promise, command.getCodec(), command, params));
return promise;
}
public <T, R> void send(RedisData<T, R> data) {
channel.writeAndFlush(data);
}
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params); Future<R> r = async(encoder, command, params);
return await(r); return await(r);
@ -58,6 +82,10 @@ public class RedisConnection {
return promise; return promise;
} }
public ChannelFuture closeAsync() {
return channel.close();
}
// public <R> Future<R> execute(Codec encoder, RedisCommand<R> command, Object ... params) { // public <R> Future<R> execute(Codec encoder, RedisCommand<R> command, Object ... params) {
// Promise<R> promise = bootstrap.group().next().<R>newPromise(); // Promise<R> promise = bootstrap.group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<R, R>(promise, encoder, command, params)); // channel.writeAndFlush(new RedisData<R, R>(promise, encoder, command, params));

@ -43,6 +43,9 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
} else if (code == '-') { } else if (code == '-') {
Object result = data.getCommand().getReponseDecoder().decode(in); Object result = data.getCommand().getReponseDecoder().decode(in);
data.getPromise().setFailure(new RedisException(result.toString())); data.getPromise().setFailure(new RedisException(result.toString()));
} else if (code == ':') {
Object result = data.getCommand().getReponseDecoder().decode(in);
data.getPromise().setSuccess(result);
} else if (code == '$') { } else if (code == '$') {
Decoder<Object> decoder = data.getCommand().getReponseDecoder(); Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) { if (decoder == null) {
@ -51,7 +54,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
Object result = decoder.decode(readBytes(in)); Object result = decoder.decode(readBytes(in));
data.getPromise().setSuccess(result); data.getPromise().setSuccess(result);
} else { } else {
throw new IllegalStateException("Can't decode replay"); throw new IllegalStateException("Can't decode replay " + (char)code);
} }
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);

@ -0,0 +1,30 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class BooleanReplayDecoder implements Decoder<Boolean> {
@Override
public Boolean decode(ByteBuf buf) {
String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return Boolean.valueOf(status);
}
}

@ -15,14 +15,32 @@
*/ */
package org.redisson.client.protocol; package org.redisson.client.protocol;
import org.redisson.client.handler.RedisData;
import io.netty.util.concurrent.Future;
public interface RedisCommands { public interface RedisCommands {
RedisCommand<String> AUTH = new RedisCommand<String>("AUTH", new StringReplayDecoder()); RedisStringCommand AUTH = new RedisStringCommand("AUTH", new StringReplayDecoder());
RedisCommand<String> SELECT = new RedisCommand<String>("SELECT", new StringReplayDecoder()); RedisStringCommand SELECT = new RedisStringCommand("SELECT", new StringReplayDecoder());
RedisCommand<String> CLIENT_SETNAME = new RedisCommand<String>("CLIENT", "SETNAME", new StringReplayDecoder(), 1); RedisStringCommand CLIENT_SETNAME = new RedisStringCommand("CLIENT", "SETNAME", new StringReplayDecoder(), 1);
RedisCommand<String> CLIENT_GETNAME = new RedisCommand<String>("CLIENT", "GETNAME"); RedisStringCommand CLIENT_GETNAME = new RedisStringCommand("CLIENT", "GETNAME");
RedisCommand<Object> GET = new RedisCommand<Object>("GET"); RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1); RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1);
RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2);
RedisCommand<Boolean> EXISTS = new RedisCommand<Boolean>("EXISTS", new BooleanReplayDecoder(), 1);
String sync(RedisStringCommand command, Object ... params);
Future<String> async(RedisStringCommand command, Object ... params);
<T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params);
<T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params);
<T, R> void send(RedisData<T, R> data);
} }

@ -0,0 +1,28 @@
package org.redisson.client.protocol;
public class RedisStringCommand extends RedisCommand<String> {
private Codec codec = new StringCodec();
public RedisStringCommand(String name, int... encodeParamIndexes) {
super(name, encodeParamIndexes);
}
public RedisStringCommand(String name, String subName, Decoder<String> reponseDecoder,
int... encodeParamIndexes) {
super(name, subName, reponseDecoder, encodeParamIndexes);
}
public RedisStringCommand(String name, String subName, int... encodeParamIndexes) {
super(name, subName, encodeParamIndexes);
}
public RedisStringCommand(String name, Decoder<String> reponseDecoder, int... encodeParamIndexes) {
super(name, reponseDecoder, encodeParamIndexes);
}
public Codec getCodec() {
return codec;
}
}
Loading…
Cancel
Save