diff --git a/src/main/java/org/redisson/RedisNodes.java b/src/main/java/org/redisson/RedisNodes.java index f2d203d7f..07fd116f9 100644 --- a/src/main/java/org/redisson/RedisNodes.java +++ b/src/main/java/org/redisson/RedisNodes.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.redisson.client.RedisConnection; import org.redisson.client.protocol.RedisCommands; @@ -80,7 +79,7 @@ public class RedisNodes implements NodesGroup { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - Future r = c.async(RedisCommands.PING); + Future r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING); result.put(c, r); latch.countDown(); } @@ -94,7 +93,7 @@ public class RedisNodes implements NodesGroup { long time = System.currentTimeMillis(); try { - latch.await(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS); + latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -110,7 +109,7 @@ public class RedisNodes implements NodesGroup { boolean res = true; for (Entry> entry : result.entrySet()) { Future f = entry.getValue(); - f.awaitUninterruptibly(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS); + f.awaitUninterruptibly(); if (!"PONG".equals(f.getNow())) { res = false; } diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 1b4e49d45..6e1a977f3 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -138,7 +138,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public Future getCountAsync() { - return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); } @Override diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index f98173d44..1089ec907 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -391,12 +391,12 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public boolean isHeldByCurrentThread() { - return commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); + return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); } @Override public int getHoldCount() { - Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId())); + Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId())); if (res == null) { return 0; } diff --git a/src/main/java/org/redisson/RedissonSemaphore.java b/src/main/java/org/redisson/RedissonSemaphore.java index 6fce66b17..4f60ef927 100644 --- a/src/main/java/org/redisson/RedissonSemaphore.java +++ b/src/main/java/org/redisson/RedissonSemaphore.java @@ -434,7 +434,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public int availablePermits() { - Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); + Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); return res.intValue(); } diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 011d7c529..1a545052e 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -46,13 +46,19 @@ import java.util.Map; import org.redisson.client.protocol.RedisCommands; import org.redisson.misc.URIBuilder; +/** + * Low-level Redis client + * + * @author Nikita Koksharov + * + */ public class RedisClient { private final Bootstrap bootstrap; private final InetSocketAddress addr; private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - private final long timeout; + private final long commandTimeout; private boolean hasOwnGroup; public RedisClient(String address) { @@ -69,15 +75,19 @@ public class RedisClient { } public RedisClient(String host, int port) { - this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60 * 1000); + this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 3000); hasOwnGroup = true; } public RedisClient(EventLoopGroup group, String host, int port) { - this(group, NioSocketChannel.class, host, port, 60 * 1000); + this(group, NioSocketChannel.class, host, port, 3000); } - public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, int port, int timeout) { + public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, int port, int connectTimeout) { + this(group, socketChannelClass, host, port, connectTimeout, 3000); + } + + public RedisClient(EventLoopGroup group, Class socketChannelClass, String host, int port, int connectTimeout, int commandTimeout) { addr = new InetSocketAddress(host, port); bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); bootstrap.handler(new ChannelInitializer() { @@ -91,16 +101,17 @@ public class RedisClient { } }); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); - this.timeout = timeout; + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); + this.commandTimeout = commandTimeout; } + public InetSocketAddress getAddr() { return addr; } - long getTimeout() { - return timeout; + public long getCommandTimeout() { + return commandTimeout; } public Bootstrap getBootstrap() { diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index df79fda72..a389295b2 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -122,8 +122,7 @@ public class RedisConnection implements RedisCommands { }); try { - // TODO change connectTimeout to timeout - if (!l.await(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { + if (!l.await(redisClient.getCommandTimeout(), TimeUnit.MILLISECONDS)) { Promise promise = (Promise)future; RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); promise.setFailure(ex); @@ -143,8 +142,7 @@ public class RedisConnection implements RedisCommands { } public T sync(RedisStrictCommand command, Object ... params) { - Future r = async(null, command, params); - return await(r); + return sync(null, command, params); } public ChannelFuture send(CommandData data) { @@ -156,29 +154,37 @@ public class RedisConnection implements RedisCommands { } public R sync(Codec encoder, RedisCommand command, Object ... params) { - Future r = async(encoder, command, params); - return await(r); + Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + send(new CommandData(promise, encoder, command, params)); + return await(promise); } public Future async(RedisCommand command, Object ... params) { return async(null, command, params); } + + public Future async(long timeout, RedisCommand command, Object ... params) { + return async(null, command, params); + } public Future async(Codec encoder, RedisCommand command, Object ... params) { - Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); - send(new CommandData(promise, encoder, command, params)); - return promise; + return async(-1, encoder, command, params); } - public Future asyncWithTimeout(Codec encoder, RedisCommand command, Object ... params) { + public Future async(long timeout, Codec encoder, RedisCommand command, Object ... params) { final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + if (timeout == -1) { + timeout = redisClient.getCommandTimeout(); + } + final ScheduledFuture scheduledFuture = redisClient.getBootstrap().group().next().schedule(new Runnable() { @Override public void run() { RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); promise.tryFailure(ex); } - }, redisClient.getTimeout(), TimeUnit.MILLISECONDS); + }, timeout, TimeUnit.MILLISECONDS); + promise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 1857f4bd1..5fe6b97af 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -200,7 +200,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } final RedisConnection connection = future.getNow(); - Future> clusterFuture = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_INFO); + Future> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO); clusterFuture.addListener(new FutureListener>() { @Override @@ -322,7 +322,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator iterator) { - Future> future = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_NODES); + Future> future = connection.async(RedisCommands.CLUSTER_NODES); future.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 9a0a52697..6dcc4c263 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -367,7 +367,7 @@ abstract class ConnectionPool { }; if (entry.getConfig().getPassword() != null) { - Future temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword()); + Future temp = c.async(RedisCommands.AUTH, config.getPassword()); FutureListener listener = new FutureListener () { @Override public void operationComplete (Future < Void > future)throws Exception { @@ -386,7 +386,7 @@ abstract class ConnectionPool { } private void ping(RedisConnection c, final FutureListener pingListener) { - Future f = c.asyncWithTimeout(null, RedisCommands.PING); + Future f = c.async(RedisCommands.PING); f.addListener(pingListener); }