From ea22f59060e16b2e3397c349ce053349f4c99b97 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Nov 2015 16:42:16 +0300 Subject: [PATCH] shutdown case handling --- .../redisson/CommandBatchExecutorService.java | 33 ++++----------- .../org/redisson/CommandExecutorService.java | 38 +++++------------- .../connection/ConnectionManager.java | 3 -- .../MasterSlaveConnectionManager.java | 40 ++++++++++++++++--- .../org/redisson/misc/ConnectionPool.java | 2 +- 5 files changed, 54 insertions(+), 62 deletions(-) diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 2aeacd31e..73186d90c 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -216,14 +216,16 @@ public class CommandBatchExecutorService extends CommandExecutorService { final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + if (connectionFuture.cancel(false)) { + connectionManager.getShutdownLatch().release(); + } + if (attemptPromise.isDone()) { return; } - connectionFuture.cancel(false); - if (attempt == connectionManager.getConfig().getRetryAttempts()) { - attemptPromise.setFailure(ex.get()); + attemptPromise.tryFailure(ex.get()); return; } if (!attemptPromise.cancel(false)) { @@ -236,7 +238,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { }; ex.set(new RedisTimeoutException("Batch command execution timeout")); - final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); connectionFuture.addListener(new FutureListener() { @Override @@ -246,12 +248,9 @@ public class CommandBatchExecutorService extends CommandExecutorService { } if (!connFuture.isSuccess()) { timeout.cancel(); - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } ex.set(convertException(connFuture)); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); return; } @@ -273,12 +272,8 @@ public class CommandBatchExecutorService extends CommandExecutorService { if (!future.isSuccess()) { timeout.cancel(); - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } } }); @@ -300,28 +295,16 @@ public class CommandBatchExecutorService extends CommandExecutorService { } if (future.cause() instanceof RedisMovedException) { - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - RedisMovedException ex = (RedisMovedException)future.cause(); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisAskException) { - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - RedisAskException ex = (RedisAskException)future.cause(); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); return; } if (future.cause() instanceof RedisLoadingException) { - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - execute(entry, source, mainPromise, slots, attempt); return; } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 2a50e237f..58757323b 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -259,7 +259,7 @@ public class CommandExecutorService implements CommandExecutor { private R async(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation operation, int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { - return null; + throw new IllegalStateException("Redisson is shutdown"); } try { @@ -432,12 +432,14 @@ public class CommandExecutorService implements CommandExecutor { final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + if (connectionFuture.cancel(false)) { + connectionManager.getShutdownLatch().release(); + } + if (attemptPromise.isDone()) { return; } - connectionFuture.cancel(false); - if (attempt == connectionManager.getConfig().getRetryAttempts()) { attemptPromise.tryFailure(ex.get()); return; @@ -452,7 +454,7 @@ public class CommandExecutorService implements CommandExecutor { }; ex.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params " + Arrays.toString(params))); - final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + final Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); connectionFuture.addListener(new FutureListener() { @Override @@ -462,11 +464,8 @@ public class CommandExecutorService implements CommandExecutor { } if (!connFuture.isSuccess()) { timeout.cancel(); - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } ex.set(convertException(connFuture)); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); return; } @@ -493,12 +492,9 @@ public class CommandExecutorService implements CommandExecutor { } if (!future.isSuccess()) { timeout.cancel(); - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } ex.set(new WriteRedisConnectionException( "Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause())); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); } } }); @@ -520,33 +516,21 @@ public class CommandExecutorService implements CommandExecutor { } if (future.cause() instanceof RedisMovedException) { - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - RedisMovedException ex = (RedisMovedException)future.cause(); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), messageDecoder, codec, command, params, mainPromise, attempt); return; } if (future.cause() instanceof RedisAskException) { - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - RedisAskException ex = (RedisAskException)future.cause(); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); async(readOnlyMode, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), messageDecoder, codec, command, params, mainPromise, attempt); return; } if (future.cause() instanceof RedisLoadingException) { - if (!connectionManager.getShutdownLatch().acquire()) { - return; - } - - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, attempt); return; } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 7a823c42f..d18197022 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -31,7 +31,6 @@ import org.redisson.connection.ConnectionEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; import io.netty.channel.EventLoopGroup; -import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; @@ -56,8 +55,6 @@ public interface ConnectionManager { int calcSlot(String key); - HashedWheelTimer getTimer(); - MasterSlaveServersConfig getConfig(); Codec getCodec(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 612b40e7a..db0ee8cd9 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -50,6 +50,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; +import io.netty.util.Timer; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -63,6 +64,33 @@ import io.netty.util.internal.PlatformDependent; */ public class MasterSlaveConnectionManager implements ConnectionManager { + private final Timeout dummyTimeout = new Timeout() { + @Override + public Timer timer() { + return null; + } + + @Override + public TimerTask task() { + return null; + } + + @Override + public boolean isExpired() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel() { + return false; + } + }; + protected static final int MAX_SLOT = 16384; protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT); @@ -92,11 +120,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveConnectionManager() { } - @Override - public HashedWheelTimer getTimer() { - return timer; - } - @Override public MasterSlaveServersConfig getConfig() { return config; @@ -655,7 +678,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { - return timer.newTimeout(task, delay, unit); + try { + return timer.newTimeout(task, delay, unit); + } catch (IllegalStateException e) { + // timer is shutdown + return dummyTimeout; + } } public InfinitySemaphoreLatch getShutdownLatch() { diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 2ab171783..d47db3d1d 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -254,7 +254,7 @@ public class ConnectionPool { } private void scheduleCheck(final SubscribesConnectionEntry entry) { - connectionManager.getTimer().newTimeout(new TimerTask() { + connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { if (entry.getFreezeReason() != FreezeReason.RECONNECT