From 29d235f717ba7caae4c3c80dab3e716860c414eb Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 15 Jul 2015 18:01:30 +0300 Subject: [PATCH] graceful shutdown. #185 --- .../MasterSlaveConnectionManager.java | 34 ++++++ .../redisson/misc/InfinitySemaphoreLatch.java | 109 ++++++++++++++++++ .../org/redisson/misc/ReclosableLatch.java | 1 + 3 files changed, 144 insertions(+) create mode 100644 src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1e5123dd1..85a9ac840 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -46,6 +46,7 @@ import org.redisson.async.AsyncOperation; import org.redisson.async.SyncInterruptedOperation; import org.redisson.async.SyncOperation; import org.redisson.codec.RedisCodecWrapper; +import org.redisson.misc.InfinitySemaphoreLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final HashedWheelTimer timer = new HashedWheelTimer(); + private InfinitySemaphoreLatch latch = new InfinitySemaphoreLatch(); + protected RedisCodec codec; protected EventLoopGroup group; @@ -124,6 +127,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public FutureListener createReleaseWriteListener(final int slot, final RedisConnection conn, final Timeout timeout) { + latch.release(); return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { @@ -135,6 +139,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public FutureListener createReleaseReadListener(final int slot, final RedisConnection conn, final Timeout timeout) { + latch.release(); return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { @@ -154,6 +159,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private void writeAllAsync(final int slot, final AsyncOperation asyncOperation, final AtomicInteger counter, final Promise mainPromise, final int attempt) { + if (!latch.acquire()) { + mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); + return; + } + final Promise promise = getGroup().next().newPromise(); final AtomicReference ex = new AtomicReference(); @@ -225,6 +235,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private void writeAsync(final int slot, final AsyncOperation asyncOperation, final Promise mainPromise, final int attempt) { + if (!latch.acquire()) { + mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); + return; + } + final Promise promise = getGroup().next().newPromise(); final AtomicReference ex = new AtomicReference(); @@ -290,6 +305,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private R write(int slot, SyncInterruptedOperation operation, int attempt) throws InterruptedException { + if (!latch.acquire()) { + return null; + } try { RedisConnection connection = connectionWriteOp(slot); try { @@ -305,6 +323,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } catch (InterruptedException e) { throw e; } finally { + latch.release(); releaseWrite(slot, connection); } } catch (RedisConnectionException e) { @@ -331,6 +350,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private R write(int slot, SyncOperation operation, int attempt) { + if (!latch.acquire()) { + return null; + } try { RedisConnection connection = connectionWriteOp(slot); try { @@ -344,6 +366,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { attempt++; return write(slot, operation, attempt); } finally { + latch.release(); releaseWrite(slot, connection); } } catch (RedisConnectionException e) { @@ -370,6 +393,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private R read(int slot, SyncOperation operation, int attempt) { + if (!latch.acquire()) { + return null; + } + try { RedisConnection connection = connectionReadOp(slot); try { @@ -383,6 +410,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { attempt++; return read(slot, operation, attempt); } finally { + latch.release(); releaseRead(slot, connection); } } catch (RedisConnectionException e) { @@ -461,6 +489,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } private void readAsync(final int slot, final AsyncOperation asyncOperation, final Promise mainPromise, final int attempt) { + if (!latch.acquire()) { + mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); + return; + } + final Promise promise = getGroup().next().newPromise(); final AtomicReference ex = new AtomicReference(); @@ -793,6 +826,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { + latch.closeAndAwaitUninterruptibly(); for (MasterSlaveEntry entry : entries.values()) { entry.shutdown(); } diff --git a/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java b/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java new file mode 100644 index 000000000..6371b4633 --- /dev/null +++ b/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java @@ -0,0 +1,109 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * + * Code parts from Manik Surtani (manik@jboss.org) + * @author Nikita Koksharov + */ +public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer { + + private static final long serialVersionUID = 1744280161777661090l; + + volatile boolean closed; + AtomicInteger sharedResources = new AtomicInteger(); + + // the following states are used in the AQS. + private static final int OPEN_STATE = 0; + + public InfinitySemaphoreLatch() { + setState(OPEN_STATE); + } + + @Override + public final int tryAcquireShared(int ignored) { + // return 1 if we allow the requestor to proceed, -1 if we want the + // requestor to block. + return getState() == OPEN_STATE ? 1 : -1; + } + + @Override + public final boolean tryReleaseShared(int state) { + // used as a mechanism to set the state of the Sync. + setState(state); + return true; + } + + public final boolean acquire() { + if (closed) { + return false; + } + releaseShared(sharedResources.incrementAndGet()); + return true; + } + + public final void release() { + // do not use setState() directly since this won't notify parked + // threads. + releaseShared(sharedResources.decrementAndGet()); + } + + public boolean isOpened() { + return getState() == OPEN_STATE; + } + + // waiting for an open state + public final void closeAndAwaitUninterruptibly() { + closed = true; + try { + await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // waiting for an open state + public final void await() throws InterruptedException { + acquireSharedInterruptibly(1); // the 1 is a dummy value that is not + // used. + } + + public final void awaitUninterruptibly() { + try { + await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public final boolean await(long time, TimeUnit unit) throws InterruptedException { + return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy + // value that is + // not used. + } + + @Override + public String toString() { + int s = getState(); + String q = hasQueuedThreads() ? "non" : ""; + return "ReclosableLatch [State = " + s + ", " + q + "empty queue]"; + } +} diff --git a/src/main/java/org/redisson/misc/ReclosableLatch.java b/src/main/java/org/redisson/misc/ReclosableLatch.java index 7557190a3..a7d165cf9 100644 --- a/src/main/java/org/redisson/misc/ReclosableLatch.java +++ b/src/main/java/org/redisson/misc/ReclosableLatch.java @@ -69,6 +69,7 @@ public class ReclosableLatch extends AbstractQueuedSynchronizer { return getState() == OPEN_STATE; } + // waiting for an open state public final void await() throws InterruptedException { acquireSharedInterruptibly(1); // the 1 is a dummy value that is not used. }