diff --git a/README.md b/README.md index 73048a7bd..f2f4b4b3a 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,11 @@ Recent Releases ================================ ####Please Note: trunk is current development branch. +####22-Jul-2015 - version 1.3.1 released +Fixed - requests state sync during shutdown +Fixed - netty-transport-native-epoll is now has a provided scope +Fixed - NPE during `BlockingQueue.poll` invocation + ####04-Jul-2015 - version 1.3.0 released Feature - `RQueue.pollLastAndOfferFirstTo` method added Feature - `RObject.rename`, `RObject.renameAsync`, `RObject.renamenx`, `RObject.renamenxAsync` methods added diff --git a/pom.xml b/pom.xml index 4d574e84d..59001cd4e 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 1.3.1-SNAPSHOT + 1.3.2-SNAPSHOT bundle Redisson @@ -93,6 +93,7 @@ io.netty netty-transport-native-epoll 4.0.28.Final + provided io.netty @@ -196,7 +197,7 @@ org.apache.maven.plugins maven-pmd-plugin - 3.0.1 + 3.5 verify @@ -217,7 +218,7 @@ org.apache.maven.plugins maven-checkstyle-plugin - 2.10 + 2.15 verify @@ -227,7 +228,6 @@ - src/main/java/org/redisson/ true false /checkstyle.xml diff --git a/src/main/java/org/redisson/BaseConfig.java b/src/main/java/org/redisson/BaseConfig.java index 3e2d52635..6bb24f9c6 100644 --- a/src/main/java/org/redisson/BaseConfig.java +++ b/src/main/java/org/redisson/BaseConfig.java @@ -59,6 +59,7 @@ class BaseConfig> { setRetryInterval(config.getRetryInterval()); setDatabase(config.getDatabase()); setTimeout(config.getTimeout()); + setClientName(config.getClientName()); } /** diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index f351c087f..eca3752e3 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -111,6 +111,15 @@ public class CommandBatchExecutorService extends CommandExecutorService { if (executed) { throw new IllegalStateException("Batch already executed!"); } + if (!connectionManager.getShutdownLatch().acquireAmount(commands.size())) { + IllegalStateException fail = new IllegalStateException("Redisson is shutdown"); + for (Entry e : commands.values()) { + for (CommandEntry entry : e.getCommands()) { + entry.getCommand().getPromise().setFailure(fail); + } + } + return connectionManager.getGroup().next().newFailedFuture(fail); + } if (commands.isEmpty()) { return connectionManager.getGroup().next().newSucceededFuture(null); diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 840d2b0e9..d4a3913a5 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -157,6 +157,10 @@ public class CommandExecutorService implements CommandExecutor { } private R async(boolean readOnlyMode, int slot, SyncOperation operation, int attempt) { + if (!connectionManager.getShutdownLatch().acquire()) { + return null; + } + try { RedisConnection connection; if (readOnlyMode) { @@ -175,6 +179,7 @@ public class CommandExecutorService implements CommandExecutor { attempt++; return async(readOnlyMode, slot, operation, attempt); } finally { + connectionManager.getShutdownLatch().release(); if (readOnlyMode) { connectionManager.releaseRead(slot, connection); } else { @@ -268,6 +273,11 @@ public class CommandExecutorService implements CommandExecutor { protected void async(final boolean readOnlyMode, final int slot, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, final Object[] params, final Promise mainPromise, final int attempt) { + if (!connectionManager.getShutdownLatch().acquire()) { + mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); + return; + } + final Promise attemptPromise = connectionManager.newPromise(); final AtomicReference ex = new AtomicReference(); diff --git a/src/main/java/org/redisson/Config.java b/src/main/java/org/redisson/Config.java index cd5580bdd..dae4fb0cd 100644 --- a/src/main/java/org/redisson/Config.java +++ b/src/main/java/org/redisson/Config.java @@ -194,6 +194,7 @@ public class Config { /** * Activates an unix socket if servers binded to loopback interface. * Also used for epoll transport activation. + * netty-transport-native-epoll library should be in classpath * * @param useLinuxNativeEpoll * @return diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 348efd7de..282458ef8 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -24,6 +24,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; +import org.redisson.misc.InfinitySemaphoreLatch; import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; @@ -88,4 +89,7 @@ public interface ConnectionManager { EventLoopGroup getGroup(); Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + + InfinitySemaphoreLatch getShutdownLatch(); + } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index b7dcfd87b..cbcd4731c 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -36,6 +36,7 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; +import org.redisson.misc.InfinitySemaphoreLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected EventLoopGroup group; + protected Class socketChannelClass; protected final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap(); @@ -75,6 +77,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final NavigableMap entries = new ConcurrentSkipListMap(); + private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); + MasterSlaveConnectionManager() { } @@ -138,6 +142,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + shutdownLatch.release(); timeout.cancel(); releaseWrite(slot, conn); } @@ -149,6 +154,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + shutdownLatch.release(); timeout.cancel(); releaseRead(slot, conn); } @@ -448,6 +454,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { + shutdownLatch.closeAndAwaitUninterruptibly(); for (MasterSlaveEntry entry : entries.values()) { entry.shutdown(); } @@ -470,4 +477,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return timer.newTimeout(task, delay, unit); } + public InfinitySemaphoreLatch getShutdownLatch() { + return shutdownLatch; + } + } diff --git a/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java b/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java new file mode 100644 index 000000000..4e6e238e6 --- /dev/null +++ b/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java @@ -0,0 +1,104 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * + * Code parts from Manik Surtani (manik@jboss.org) + * @author Nikita Koksharov + */ +public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer { + + private static final long serialVersionUID = 1744280161777661090l; + + volatile boolean closed; + AtomicInteger sharedResources = new AtomicInteger(); + + // the following states are used in the AQS. + private static final int OPEN_STATE = 0; + + public InfinitySemaphoreLatch() { + setState(OPEN_STATE); + } + + @Override + public final int tryAcquireShared(int ignored) { + // return 1 if we allow the requestor to proceed, -1 if we want the + // requestor to block. + return getState() == OPEN_STATE ? 1 : -1; + } + + @Override + public final boolean tryReleaseShared(int state) { + // used as a mechanism to set the state of the Sync. + setState(state); + return true; + } + + public final boolean acquireAmount(int amount) { + if (closed) { + return false; + } + releaseShared(sharedResources.addAndGet(amount)); + return true; + } + + public final boolean acquire() { + if (closed) { + return false; + } + releaseShared(sharedResources.incrementAndGet()); + return true; + } + + public final void release() { + // do not use setState() directly since this won't notify parked + // threads. + releaseShared(sharedResources.decrementAndGet()); + } + + public boolean isOpened() { + return getState() == OPEN_STATE; + } + + // waiting for an open state + public final boolean closeAndAwaitUninterruptibly() { + closed = true; + try { + return await(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + private boolean await(long time, TimeUnit unit) throws InterruptedException { + return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy + // value that is + // not used. + } + + @Override + public String toString() { + int s = getState(); + String q = hasQueuedThreads() ? "non" : ""; + return "ReclosableLatch [State = " + s + ", " + q + "empty queue]"; + } +} diff --git a/src/main/java/org/redisson/misc/ReclosableLatch.java b/src/main/java/org/redisson/misc/ReclosableLatch.java index 7557190a3..a7d165cf9 100644 --- a/src/main/java/org/redisson/misc/ReclosableLatch.java +++ b/src/main/java/org/redisson/misc/ReclosableLatch.java @@ -69,6 +69,7 @@ public class ReclosableLatch extends AbstractQueuedSynchronizer { return getState() == OPEN_STATE; } + // waiting for an open state public final void await() throws InterruptedException { acquireSharedInterruptibly(1); // the 1 is a dummy value that is not used. }