diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 82722f9cf..70fb61b3b 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -16,6 +16,7 @@ package org.redisson.client; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisDecoder; @@ -27,22 +28,25 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; public class RedisClient { - private Class socketChannelClass = NioSocketChannel.class; private Bootstrap bootstrap; - private EventLoopGroup group = new NioEventLoopGroup(); private InetSocketAddress addr; - private Channel channel; + + private long timeout; + private TimeUnit timeoutUnit; public RedisClient(String host, int port) { + this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000); + } + + public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, int port, int timeout) { addr = new InetSocketAddress(host, port); bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); bootstrap.handler(new ChannelInitializer() { @@ -56,13 +60,40 @@ public class RedisClient { } }); + + setTimeout(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Set the default timeout for {@link RedisConnection connections} created by + * this client. The timeout applies to connection attempts and non-blocking + * commands. + * + * @param timeout Сonnection timeout. + * @param unit Unit of time for the timeout. + */ + public void setTimeout(long timeout, TimeUnit unit) { + this.timeout = timeout; + this.timeoutUnit = unit; + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout)); + } + + long getTimeout() { + return timeout; + } + + TimeUnit getTimeoutUnit() { + return timeoutUnit; + } + + Bootstrap getBootstrap() { + return bootstrap; } public RedisConnection connect() { ChannelFuture future = bootstrap.connect(); - channel = future.channel(); future.syncUninterruptibly(); - return new RedisConnection(bootstrap, channel); + return new RedisConnection(this, future.channel()); } public static void main(String[] args) throws InterruptedException { diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 9ee3f99c7..1a2f07fc5 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -4,22 +4,37 @@ import org.redisson.client.handler.RedisData; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.RedisCommand; -import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; public class RedisConnection { - final Bootstrap bootstrap; final Channel channel; + final RedisClient redisClient; - public RedisConnection(Bootstrap bootstrap, Channel channel) { + public RedisConnection(RedisClient redisClient, Channel channel) { super(); - this.bootstrap = bootstrap; + this.redisClient = redisClient; this.channel = channel; } + public R await(Future cmd) { + if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), redisClient.getTimeoutUnit())) { + Promise promise = (Promise)cmd; + RedisTimeoutException ex = new RedisTimeoutException(); + promise.setFailure(ex); + throw ex; + } + if (!cmd.isSuccess()) { + if (cmd.cause() instanceof RedisException) { + throw (RedisException) cmd.cause(); + } + throw new RedisException("Unexpected exception while processing command", cmd.cause()); + } + return cmd.getNow(); + } + public V get(Future future) { future.awaitUninterruptibly(); if (future.isSuccess()) { @@ -34,11 +49,11 @@ public class RedisConnection { public R sync(Codec encoder, RedisCommand command, Object ... params) { Future r = async(encoder, command, params); - return get(r); + return await(r); } public Future async(Codec encoder, RedisCommand command, Object ... params) { - Promise promise = bootstrap.group().next().newPromise(); + Promise promise = redisClient.getBootstrap().group().next().newPromise(); channel.writeAndFlush(new RedisData(promise, encoder, command, params)); return promise; } diff --git a/src/main/java/org/redisson/client/RedisException.java b/src/main/java/org/redisson/client/RedisException.java index f5bb25971..63fce633b 100644 --- a/src/main/java/org/redisson/client/RedisException.java +++ b/src/main/java/org/redisson/client/RedisException.java @@ -4,6 +4,9 @@ public class RedisException extends RuntimeException { private static final long serialVersionUID = 3389820652701696154L; + public RedisException() { + } + public RedisException(String message, Throwable cause) { super(message, cause); } diff --git a/src/main/java/org/redisson/client/RedisTimeoutException.java b/src/main/java/org/redisson/client/RedisTimeoutException.java new file mode 100644 index 000000000..6cfdb8ad7 --- /dev/null +++ b/src/main/java/org/redisson/client/RedisTimeoutException.java @@ -0,0 +1,7 @@ +package org.redisson.client; + +public class RedisTimeoutException extends RedisException { + + private static final long serialVersionUID = -8418769175260962404L; + +}