Merge branch 'master' into 2.0

Conflicts:
	src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
pull/243/head
Nikita 10 years ago
commit 4c88e3af6a

@ -54,6 +54,11 @@ Recent Releases
================================
####Please Note: trunk is current development branch.
####22-Jul-2015 - version 1.3.1 released
Fixed - requests state sync during shutdown
Fixed - netty-transport-native-epoll is now has a provided scope
Fixed - NPE during `BlockingQueue.poll` invocation
####04-Jul-2015 - version 1.3.0 released
Feature - `RQueue.pollLastAndOfferFirstTo` method added
Feature - `RObject.rename`, `RObject.renameAsync`, `RObject.renamenx`, `RObject.renamenxAsync` methods added

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>1.3.1-SNAPSHOT</version>
<version>1.3.2-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Redisson</name>
@ -93,6 +93,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.0.28.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@ -196,7 +197,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.0.1</version>
<version>3.5</version>
<executions>
<execution>
<phase>verify</phase>
@ -217,7 +218,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.10</version>
<version>2.15</version>
<executions>
<execution>
<phase>verify</phase>
@ -227,7 +228,6 @@
</execution>
</executions>
<configuration>
<includes>src/main/java/org/redisson/</includes>
<consoleOutput>true</consoleOutput>
<enableRSS>false</enableRSS>
<configLocation>/checkstyle.xml</configLocation>

@ -59,6 +59,7 @@ class BaseConfig<T extends BaseConfig<T>> {
setRetryInterval(config.getRetryInterval());
setDatabase(config.getDatabase());
setTimeout(config.getTimeout());
setClientName(config.getClientName());
}
/**

@ -111,6 +111,15 @@ public class CommandBatchExecutorService extends CommandExecutorService {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (!connectionManager.getShutdownLatch().acquireAmount(commands.size())) {
IllegalStateException fail = new IllegalStateException("Redisson is shutdown");
for (Entry e : commands.values()) {
for (CommandEntry entry : e.getCommands()) {
entry.getCommand().getPromise().setFailure(fail);
}
}
return connectionManager.getGroup().next().newFailedFuture(fail);
}
if (commands.isEmpty()) {
return connectionManager.getGroup().next().newSucceededFuture(null);

@ -157,6 +157,10 @@ public class CommandExecutorService implements CommandExecutor {
}
private <R> R async(boolean readOnlyMode, int slot, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
return null;
}
try {
RedisConnection connection;
if (readOnlyMode) {
@ -175,6 +179,7 @@ public class CommandExecutorService implements CommandExecutor {
attempt++;
return async(readOnlyMode, slot, operation, attempt);
} finally {
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(slot, connection);
} else {
@ -268,6 +273,11 @@ public class CommandExecutorService implements CommandExecutor {
protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
}
final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

@ -194,6 +194,7 @@ public class Config {
/**
* Activates an unix socket if servers binded to loopback interface.
* Also used for epoll transport activation.
* <b>netty-transport-native-epoll</b> library should be in classpath
*
* @param useLinuxNativeEpoll
* @return

@ -24,6 +24,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.misc.InfinitySemaphoreLatch;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
@ -88,4 +89,7 @@ public interface ConnectionManager {
EventLoopGroup getGroup();
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
InfinitySemaphoreLatch getShutdownLatch();
}

@ -36,6 +36,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,6 +68,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected EventLoopGroup group;
protected Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
@ -75,6 +77,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final NavigableMap<Integer, MasterSlaveEntry> entries = new ConcurrentSkipListMap<Integer, MasterSlaveEntry>();
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
MasterSlaveConnectionManager() {
}
@ -138,6 +142,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
shutdownLatch.release();
timeout.cancel();
releaseWrite(slot, conn);
}
@ -149,6 +154,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
shutdownLatch.release();
timeout.cancel();
releaseRead(slot, conn);
}
@ -448,6 +454,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void shutdown() {
shutdownLatch.closeAndAwaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown();
}
@ -470,4 +477,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return timer.newTimeout(task, delay, unit);
}
public InfinitySemaphoreLatch getShutdownLatch() {
return shutdownLatch;
}
}

@ -0,0 +1,104 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
*
* Code parts from Manik Surtani (<a href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @author Nikita Koksharov
*/
public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1744280161777661090l;
volatile boolean closed;
AtomicInteger sharedResources = new AtomicInteger();
// the following states are used in the AQS.
private static final int OPEN_STATE = 0;
public InfinitySemaphoreLatch() {
setState(OPEN_STATE);
}
@Override
public final int tryAcquireShared(int ignored) {
// return 1 if we allow the requestor to proceed, -1 if we want the
// requestor to block.
return getState() == OPEN_STATE ? 1 : -1;
}
@Override
public final boolean tryReleaseShared(int state) {
// used as a mechanism to set the state of the Sync.
setState(state);
return true;
}
public final boolean acquireAmount(int amount) {
if (closed) {
return false;
}
releaseShared(sharedResources.addAndGet(amount));
return true;
}
public final boolean acquire() {
if (closed) {
return false;
}
releaseShared(sharedResources.incrementAndGet());
return true;
}
public final void release() {
// do not use setState() directly since this won't notify parked
// threads.
releaseShared(sharedResources.decrementAndGet());
}
public boolean isOpened() {
return getState() == OPEN_STATE;
}
// waiting for an open state
public final boolean closeAndAwaitUninterruptibly() {
closed = true;
try {
return await(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
private boolean await(long time, TimeUnit unit) throws InterruptedException {
return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy
// value that is
// not used.
}
@Override
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return "ReclosableLatch [State = " + s + ", " + q + "empty queue]";
}
}

@ -69,6 +69,7 @@ public class ReclosableLatch extends AbstractQueuedSynchronizer {
return getState() == OPEN_STATE;
}
// waiting for an open state
public final void await() throws InterruptedException {
acquireSharedInterruptibly(1); // the 1 is a dummy value that is not used.
}

Loading…
Cancel
Save