Fixed probably thread blocking issues #455

pull/469/head
Nikita 9 years ago
parent 33748b8650
commit d03e0de847

@ -53,7 +53,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
public void await() throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
promise.await();
get(promise);
while (getCount() > 0) {
// waiting for open state
@ -71,7 +71,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
public boolean await(long time, TimeUnit unit) throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
if (!promise.await(time, unit)) {
if (!await(promise, time, unit)) {
return false;
}

@ -110,7 +110,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
Future<RedissonLockEntry> future = subscribe();
future.sync();
get(future);
try {
while (true) {
@ -229,7 +229,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
Future<RedissonLockEntry> future = subscribe();
if (!future.await(time, TimeUnit.MILLISECONDS)) {
if (!await(future, time, TimeUnit.MILLISECONDS)) {
future.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {

@ -15,6 +15,8 @@
*/
package org.redisson;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@ -45,6 +47,10 @@ abstract class RedissonObject implements RObject {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected boolean await(Future<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
return commandExecutor.await(future, timeout, timeoutUnit);
}
protected <V> V get(Future<V> future) {
return commandExecutor.get(future);
}

@ -70,7 +70,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
return;
}
Future<RedissonLockEntry> future = subscribe().sync();
Future<RedissonLockEntry> future = subscribe();
get(future);
try {
while (true) {
if (tryAcquire(permits)) {
@ -113,7 +114,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
long time = unit.toMillis(waitTime);
Future<RedissonLockEntry> future = subscribe();
if (!future.await(time, TimeUnit.MILLISECONDS)) {
if (!await(future, time, TimeUnit.MILLISECONDS)) {
return false;
}

@ -15,6 +15,7 @@
*/
package org.redisson.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
@ -111,21 +112,34 @@ public class RedisConnection implements RedisCommands {
return redisClient;
}
public <R> R await(Future<R> cmd) {
// TODO change connectTimeout to timeout
if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)cmd;
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex);
throw ex;
}
if (!cmd.isSuccess()) {
if (cmd.cause() instanceof RedisException) {
throw (RedisException) cmd.cause();
public <R> R await(Future<R> future) {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
l.countDown();
}
});
try {
// TODO change connectTimeout to timeout
if (!l.await(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)future;
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex);
throw ex;
}
if (!future.isSuccess()) {
if (future.cause() instanceof RedisException) {
throw (RedisException) future.cause();
}
throw new RedisException("Unexpected exception while processing command", future.cause());
}
throw new RedisException("Unexpected exception while processing command", cmd.cause());
return future.getNow();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
return cmd.getNow();
}
public <T> T sync(RedisStrictCommand<T> command, Object ... params) {

@ -1,5 +1,4 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -173,7 +172,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (code == '+') {
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
handleResult(data, parts, result, false, channel);
} else if (code == '-') {
String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
@ -206,9 +205,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
} else if (code == ':') {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Object result = Long.valueOf(status);
Long result = readLong(in);
handleResult(data, parts, result, false, channel);
} else if (code == '$') {
ByteBuf buf = readBytes(in);

@ -23,10 +23,10 @@ import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;
@ -37,7 +37,7 @@ import io.netty.util.internal.PlatformDependent;
* @author Nikita Koksharov
*
*/
public class CommandsQueue extends ChannelDuplexHandler {
public class CommandsQueue extends ChannelOutboundHandlerAdapter {
public static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise");

@ -18,6 +18,7 @@ package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.SlotCallback;
import org.redisson.client.RedisException;
@ -38,6 +39,8 @@ public interface CommandAsyncExecutor {
<V> RedisException convertException(Future<V> future);
boolean await(Future<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
<V> V get(Future<V> future);
<T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);

@ -22,9 +22,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedisClientResult;
import org.redisson.RedissonShutdownException;
@ -82,13 +84,38 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
l.countDown();
}
});
try {
l.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// commented out due to blocking issues up to 200 ms per minute for each thread
// future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
@Override
public boolean await(Future<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
l.countDown();
}
});
return l.await(timeout, timeoutUnit);
}
@Override
public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();

@ -153,7 +153,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
StringBuilder errorMsg = new StringBuilder("Publish/Subscribe connection pool exhausted! All connections are busy. Try to increase Publish/Subscribe connection pool size.");
StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. Increase connection pool size.");
// if (!freezed.isEmpty()) {
// errorMsg.append(" Disconnected hosts: " + freezed);
// }

Loading…
Cancel
Save