Merge branch 'master' into new_pubsub

pull/555/head
Nikita 9 years ago
commit 2b0b722dbf

@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -80,7 +79,7 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
Future<String> r = c.async(RedisCommands.PING); Future<String> r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING);
result.put(c, r); result.put(c, r);
latch.countDown(); latch.countDown();
} }
@ -94,7 +93,7 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
try { try {
latch.await(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
@ -110,7 +109,7 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
boolean res = true; boolean res = true;
for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) { for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) {
Future<String> f = entry.getValue(); Future<String> f = entry.getValue();
f.awaitUninterruptibly(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS); f.awaitUninterruptibly();
if (!"PONG".equals(f.getNow())) { if (!"PONG".equals(f.getNow())) {
res = false; res = false;
} }

@ -138,7 +138,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override @Override
public Future<Long> getCountAsync() { public Future<Long> getCountAsync() {
return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
} }
@Override @Override

@ -391,12 +391,12 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public boolean isHeldByCurrentThread() { public boolean isHeldByCurrentThread() {
return commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
} }
@Override @Override
public int getHoldCount() { public int getHoldCount() {
Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId())); Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
if (res == null) { if (res == null) {
return 0; return 0;
} }

@ -434,7 +434,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override @Override
public int availablePermits() { public int availablePermits() {
Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
return res.intValue(); return res.intValue();
} }

@ -46,13 +46,19 @@ import java.util.Map;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
/**
* Low-level Redis client
*
* @author Nikita Koksharov
*
*/
public class RedisClient { public class RedisClient {
private final Bootstrap bootstrap; private final Bootstrap bootstrap;
private final InetSocketAddress addr; private final InetSocketAddress addr;
private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final long timeout; private final long commandTimeout;
private boolean hasOwnGroup; private boolean hasOwnGroup;
public RedisClient(String address) { public RedisClient(String address) {
@ -69,15 +75,19 @@ public class RedisClient {
} }
public RedisClient(String host, int port) { public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60 * 1000); this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 3000);
hasOwnGroup = true; hasOwnGroup = true;
} }
public RedisClient(EventLoopGroup group, String host, int port) { public RedisClient(EventLoopGroup group, String host, int port) {
this(group, NioSocketChannel.class, host, port, 60 * 1000); this(group, NioSocketChannel.class, host, port, 3000);
} }
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) { public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int connectTimeout) {
this(group, socketChannelClass, host, port, connectTimeout, 3000);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int connectTimeout, int commandTimeout) {
addr = new InetSocketAddress(host, port); addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
bootstrap.handler(new ChannelInitializer<Channel>() { bootstrap.handler(new ChannelInitializer<Channel>() {
@ -91,16 +101,17 @@ public class RedisClient {
} }
}); });
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
this.timeout = timeout; this.commandTimeout = commandTimeout;
} }
public InetSocketAddress getAddr() { public InetSocketAddress getAddr() {
return addr; return addr;
} }
long getTimeout() { public long getCommandTimeout() {
return timeout; return commandTimeout;
} }
public Bootstrap getBootstrap() { public Bootstrap getBootstrap() {

@ -122,8 +122,7 @@ public class RedisConnection implements RedisCommands {
}); });
try { try {
// TODO change connectTimeout to timeout if (!l.await(redisClient.getCommandTimeout(), TimeUnit.MILLISECONDS)) {
if (!l.await(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)future; Promise<R> promise = (Promise<R>)future;
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex); promise.setFailure(ex);
@ -143,8 +142,7 @@ public class RedisConnection implements RedisCommands {
} }
public <T> T sync(RedisStrictCommand<T> command, Object ... params) { public <T> T sync(RedisStrictCommand<T> command, Object ... params) {
Future<T> r = async(null, command, params); return sync(null, command, params);
return await(r);
} }
public <T, R> ChannelFuture send(CommandData<T, R> data) { public <T, R> ChannelFuture send(CommandData<T, R> data) {
@ -156,29 +154,37 @@ public class RedisConnection implements RedisCommands {
} }
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params); Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise();
return await(r); send(new CommandData<T, R>(promise, encoder, command, params));
return await(promise);
} }
public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) {
return async(null, command, params); return async(null, command, params);
} }
public <T, R> Future<R> async(long timeout, RedisCommand<T> command, Object ... params) {
return async(null, command, params);
}
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise(); return async(-1, encoder, command, params);
send(new CommandData<T, R>(promise, encoder, command, params));
return promise;
} }
public <T, R> Future<R> asyncWithTimeout(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object ... params) {
final Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise();
if (timeout == -1) {
timeout = redisClient.getCommandTimeout();
}
final ScheduledFuture<?> scheduledFuture = redisClient.getBootstrap().group().next().schedule(new Runnable() { final ScheduledFuture<?> scheduledFuture = redisClient.getBootstrap().group().next().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.tryFailure(ex); promise.tryFailure(ex);
} }
}, redisClient.getTimeout(), TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
promise.addListener(new FutureListener<R>() { promise.addListener(new FutureListener<R>() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {

@ -200,7 +200,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
final RedisConnection connection = future.getNow(); final RedisConnection connection = future.getNow();
Future<Map<String, String>> clusterFuture = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_INFO); Future<Map<String, String>> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO);
clusterFuture.addListener(new FutureListener<Map<String, String>>() { clusterFuture.addListener(new FutureListener<Map<String, String>>() {
@Override @Override
@ -322,7 +322,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator) { private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator) {
Future<List<ClusterNodeInfo>> future = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_NODES); Future<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() { future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override @Override
public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception { public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception {

@ -367,7 +367,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}; };
if (entry.getConfig().getPassword() != null) { if (entry.getConfig().getPassword() != null) {
Future<Void> temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword()); Future<Void> temp = c.async(RedisCommands.AUTH, config.getPassword());
FutureListener<Void> listener = new FutureListener<Void> () { FutureListener<Void> listener = new FutureListener<Void> () {
@Override public void operationComplete (Future < Void > future)throws Exception { @Override public void operationComplete (Future < Void > future)throws Exception {
@ -386,7 +386,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
private void ping(RedisConnection c, final FutureListener<String> pingListener) { private void ping(RedisConnection c, final FutureListener<String> pingListener) {
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING); Future<String> f = c.async(RedisCommands.PING);
f.addListener(pingListener); f.addListener(pingListener);
} }

Loading…
Cancel
Save