timeout added

pull/243/head
Nikita 10 years ago
parent d50e99762c
commit cf99a63779

@ -16,6 +16,7 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisDecoder;
@ -27,22 +28,25 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class RedisClient {
private Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;
private Bootstrap bootstrap;
private EventLoopGroup group = new NioEventLoopGroup();
private InetSocketAddress addr;
private Channel channel;
private long timeout;
private TimeUnit timeoutUnit;
public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) {
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
bootstrap.handler(new ChannelInitializer<Channel>() {
@ -56,13 +60,40 @@ public class RedisClient {
}
});
setTimeout(timeout, TimeUnit.MILLISECONDS);
}
/**
* Set the default timeout for {@link RedisConnection connections} created by
* this client. The timeout applies to connection attempts and non-blocking
* commands.
*
* @param timeout Сonnection timeout.
* @param unit Unit of time for the timeout.
*/
public void setTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.timeoutUnit = unit;
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout));
}
long getTimeout() {
return timeout;
}
TimeUnit getTimeoutUnit() {
return timeoutUnit;
}
Bootstrap getBootstrap() {
return bootstrap;
}
public RedisConnection connect() {
ChannelFuture future = bootstrap.connect();
channel = future.channel();
future.syncUninterruptibly();
return new RedisConnection(bootstrap, channel);
return new RedisConnection(this, future.channel());
}
public static void main(String[] args) throws InterruptedException {

@ -4,22 +4,37 @@ import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisConnection {
final Bootstrap bootstrap;
final Channel channel;
final RedisClient redisClient;
public RedisConnection(Bootstrap bootstrap, Channel channel) {
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
this.bootstrap = bootstrap;
this.redisClient = redisClient;
this.channel = channel;
}
public <R> R await(Future<R> cmd) {
if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), redisClient.getTimeoutUnit())) {
Promise<R> promise = (Promise<R>)cmd;
RedisTimeoutException ex = new RedisTimeoutException();
promise.setFailure(ex);
throw ex;
}
if (!cmd.isSuccess()) {
if (cmd.cause() instanceof RedisException) {
throw (RedisException) cmd.cause();
}
throw new RedisException("Unexpected exception while processing command", cmd.cause());
}
return cmd.getNow();
}
public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
if (future.isSuccess()) {
@ -34,11 +49,11 @@ public class RedisConnection {
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params);
return get(r);
return await(r);
}
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = bootstrap.group().next().<R>newPromise();
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
return promise;
}

@ -4,6 +4,9 @@ public class RedisException extends RuntimeException {
private static final long serialVersionUID = 3389820652701696154L;
public RedisException() {
}
public RedisException(String message, Throwable cause) {
super(message, cause);
}

@ -0,0 +1,7 @@
package org.redisson.client;
public class RedisTimeoutException extends RedisException {
private static final long serialVersionUID = -8418769175260962404L;
}
Loading…
Cancel
Save