Merge branch 'mrniko/master' into redisson-reference

pull/605/head
jackygurui 9 years ago
commit ea5a0275b1

@ -170,6 +170,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
private RMap<K, V> map;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
private int invalidationListenerId;
protected RedissonLocalCachedMap(RedissonClient redisson, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) {
super(commandExecutor, name);
@ -198,7 +199,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
invalidationTopic = redisson.getTopic(name + ":topic");
invalidationTopic.addListener(new MessageListener<Object>() {
if (options.isInvalidateEntryOnChange()) {
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
@ -211,6 +213,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
});
}
}
@Override
public int size() {
@ -360,6 +363,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
encodedKey, encodedValue, msg, invalidateEntryOnChange);
}
@Override
public void destroy() {
if (invalidationListenerId != 0) {
invalidationTopic.removeListener(invalidationListenerId);
}
}
@Override
public V remove(Object key) {
return get(removeAsync((K)key));

@ -46,15 +46,22 @@ public class RedissonNode {
private static final Logger log = LoggerFactory.getLogger(RedissonNode.class);
private ExecutorService executor;
private boolean hasRedissonInstance;
private RedissonClient redisson;
private final RedissonNodeConfig config;
private final String id;
private InetSocketAddress remoteAddress;
private InetSocketAddress localAddress;
private RedissonNode(RedissonNodeConfig config) {
private RedissonNode(RedissonNodeConfig config, Redisson redisson) {
this.config = new RedissonNodeConfig(config);
this.id = generateId();
this.redisson = redisson;
hasRedissonInstance = redisson == null;
}
public RedissonClient getRedisson() {
return redisson;
}
public InetSocketAddress getLocalAddress() {
@ -122,7 +129,9 @@ public class RedissonNode {
Thread.currentThread().interrupt();
}
}
if (hasRedissonInstance) {
redisson.shutdown();
}
log.info("Redisson node has been shutdown successfully");
}
@ -136,12 +145,14 @@ public class RedissonNode {
executor = Executors.newFixedThreadPool(config.getExecutorServiceThreads(), new RedissonThreadFactory());
}
if (hasRedissonInstance) {
redisson = Redisson.create(config);
}
retrieveAdresses();
if (config.getRedissonNodeInitializer() != null) {
config.getRedissonNodeInitializer().onStartup(redisson, this);
config.getRedissonNodeInitializer().onStartup(this);
}
for (Entry<String, Integer> entry : config.getExecutorServiceWorkers().entrySet()) {
@ -185,11 +196,22 @@ public class RedissonNode {
* @return RedissonNode instance
*/
public static RedissonNode create(RedissonNodeConfig config) {
return create(config, null);
}
/**
* Create Redisson node instance with provided config and Redisson instance
*
* @param config
* @param redisson
* @return RedissonNode instance
*/
public static RedissonNode create(RedissonNodeConfig config, Redisson redisson) {
if (config.getExecutorServiceWorkers().isEmpty()) {
throw new IllegalArgumentException("Executor service workers are empty");
}
return new RedissonNode(config);
return new RedissonNode(config, redisson);
}
}

@ -135,6 +135,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
private void tryAcquireAsync(final AtomicLong time, final int permits, final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<Boolean> result) {
if (result.isDone()) {
unsubscribe(subscribeFuture);
return;
}
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -147,7 +152,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
if (future.getNow()) {
unsubscribe(subscribeFuture);
result.trySuccess(true);
if (!result.trySuccess(true)) {
releaseAsync(permits);
}
return;
}
@ -170,8 +177,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel();
if (futureRef.get() != null && !futureRef.get().cancel()) {
entry.getLatch().release();
return;
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
@ -206,19 +214,26 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
private void acquireAsync(final int permits, final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<Void> result) {
if (result.isDone()) {
unsubscribe(subscribeFuture);
return;
}
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
unsubscribe(subscribeFuture);
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (future.getNow()) {
unsubscribe(subscribeFuture);
result.setSuccess(null);
if (!result.trySuccess(null)) {
releaseAsync(permits);
}
return;
}
@ -321,12 +336,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (future.getNow()) {
result.setSuccess(true);
if (!result.trySuccess(true)) {
releaseAsync(permits);
}
return;
}
@ -337,7 +354,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
@ -456,4 +473,41 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
@Override
public boolean trySetPermits(int permits) {
return get(trySetPermitsAsync(permits));
}
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
@Override
public void reducePermits(int permits) {
get(reducePermitsAsync(permits));
}
@Override
public RFuture<Void> reducePermitsAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "value = 0;"
+ "end;"
+ "redis.call('set', KEYS[1], value - ARGV[1]); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
}

@ -0,0 +1,30 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
*/
public interface RDestroyable {
/**
* Allows to destroy object then it's not necessary anymore.
*/
void destroy();
}

@ -28,7 +28,7 @@ import java.util.Map;
* @param <K>
* @param <V>
*/
public interface RLocalCachedMap<K, V> extends Map<K, V>, RExpirable, RLocalCachedMapAsync<K, V> {
public interface RLocalCachedMap<K, V> extends Map<K, V>, RExpirable, RLocalCachedMapAsync<K, V>, RDestroyable {
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.

@ -27,9 +27,8 @@ public interface RedissonNodeInitializer {
/**
* Invoked during Redisson Node startup
*
* @param redisson
* @param redissonNode
*/
void onStartup(RedissonClient redisson, RedissonNode redissonNode);
void onStartup(RedissonNode redissonNode);
}

@ -3,22 +3,43 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assume;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.api.RSemaphore;
public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testTrySetPermits() {
RSemaphore s = redisson.getSemaphore("test");
assertThat(s.trySetPermits(10)).isTrue();
assertThat(s.availablePermits()).isEqualTo(10);
assertThat(s.trySetPermits(15)).isFalse();
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testReducePermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.trySetPermits(10);
s.acquire(10);
s.reducePermits(5);
assertThat(s.availablePermits()).isEqualTo(-5);
s.release(10);
assertThat(s.availablePermits()).isEqualTo(5);
s.acquire(5);
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testBlockingAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
s.acquire();
Thread t = new Thread() {
@ -46,7 +67,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testBlockingNAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(5);
s.trySetPermits(5);
s.acquire(3);
Thread t = new Thread() {
@ -80,7 +101,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testTryNAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(5);
s.trySetPermits(5);
assertThat(s.tryAcquire(3)).isTrue();
Thread t = new Thread() {
@ -126,7 +147,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testDrainPermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.trySetPermits(10);
s.acquire(3);
assertThat(s.drainPermits()).isEqualTo(7);
@ -136,7 +157,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testReleaseAcquire() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.trySetPermits(10);
s.acquire();
assertThat(s.availablePermits()).isEqualTo(9);
s.release();
@ -153,7 +174,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
int iterations = 15;
testSingleInstanceConcurrency(iterations, r -> {
@ -178,7 +199,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
testMultiInstanceConcurrency(16, r -> {
for (int i = 0; i < iterations; i++) {
@ -208,7 +229,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(1);
s.trySetPermits(1);
testMultiInstanceConcurrency(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
@ -233,7 +254,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
final AtomicInteger lockedCounter = new AtomicInteger();
RSemaphore s = redisson.getSemaphore("test");
s.setPermits(10);
s.trySetPermits(10);
final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());

Loading…
Cancel
Save