shutdown case handling

pull/297/head
Nikita 9 years ago
parent b5590c9e5d
commit ea22f59060

@ -216,14 +216,16 @@ public class CommandBatchExecutorService extends CommandExecutorService {
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (connectionFuture.cancel(false)) {
connectionManager.getShutdownLatch().release();
}
if (attemptPromise.isDone()) {
return;
}
connectionFuture.cancel(false);
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.setFailure(ex.get());
attemptPromise.tryFailure(ex.get());
return;
}
if (!attemptPromise.cancel(false)) {
@ -236,7 +238,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
};
ex.set(new RedisTimeoutException("Batch command execution timeout"));
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
@ -246,12 +248,9 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
if (!connFuture.isSuccess()) {
timeout.cancel();
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
ex.set(convertException(connFuture));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
@ -273,12 +272,8 @@ public class CommandBatchExecutorService extends CommandExecutorService {
if (!future.isSuccess()) {
timeout.cancel();
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
@ -300,28 +295,16 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
if (future.cause() instanceof RedisMovedException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisAskException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisAskException ex = (RedisAskException)future.cause();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisLoadingException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
execute(entry, source, mainPromise, slots, attempt);
return;
}

@ -259,7 +259,7 @@ public class CommandExecutorService implements CommandExecutor {
private <R> R async(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
return null;
throw new IllegalStateException("Redisson is shutdown");
}
try {
@ -432,12 +432,14 @@ public class CommandExecutorService implements CommandExecutor {
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (connectionFuture.cancel(false)) {
connectionManager.getShutdownLatch().release();
}
if (attemptPromise.isDone()) {
return;
}
connectionFuture.cancel(false);
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.tryFailure(ex.get());
return;
@ -452,7 +454,7 @@ public class CommandExecutorService implements CommandExecutor {
};
ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params)));
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
@ -462,11 +464,8 @@ public class CommandExecutorService implements CommandExecutor {
}
if (!connFuture.isSuccess()) {
timeout.cancel();
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
ex.set(convertException(connFuture));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
@ -493,12 +492,9 @@ public class CommandExecutorService implements CommandExecutor {
}
if (!future.isSuccess()) {
timeout.cancel();
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
ex.set(new WriteRedisConnectionException(
"Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
@ -520,33 +516,21 @@ public class CommandExecutorService implements CommandExecutor {
}
if (future.cause() instanceof RedisMovedException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt);
return;
}
if (future.cause() instanceof RedisAskException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
RedisAskException ex = (RedisAskException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt);
return;
}
if (future.cause() instanceof RedisLoadingException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt);
return;
}

@ -31,7 +31,6 @@ import org.redisson.connection.ConnectionEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
@ -56,8 +55,6 @@ public interface ConnectionManager {
int calcSlot(String key);
HashedWheelTimer getTimer();
MasterSlaveServersConfig getConfig();
Codec getCodec();

@ -50,6 +50,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -63,6 +64,33 @@ import io.netty.util.internal.PlatformDependent;
*/
public class MasterSlaveConnectionManager implements ConnectionManager {
private final Timeout dummyTimeout = new Timeout() {
@Override
public Timer timer() {
return null;
}
@Override
public TimerTask task() {
return null;
}
@Override
public boolean isExpired() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean cancel() {
return false;
}
};
protected static final int MAX_SLOT = 16384;
protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT);
@ -92,11 +120,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveConnectionManager() {
}
@Override
public HashedWheelTimer getTimer() {
return timer;
}
@Override
public MasterSlaveServersConfig getConfig() {
return config;
@ -655,7 +678,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
return timer.newTimeout(task, delay, unit);
} catch (IllegalStateException e) {
// timer is shutdown
return dummyTimeout;
}
}
public InfinitySemaphoreLatch getShutdownLatch() {

@ -254,7 +254,7 @@ public class ConnectionPool<T extends RedisConnection> {
}
private void scheduleCheck(final SubscribesConnectionEntry entry) {
connectionManager.getTimer().newTimeout(new TimerTask() {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (entry.getFreezeReason() != FreezeReason.RECONNECT

Loading…
Cancel
Save