From 1605d6f9de112e8cc538c9d53598216ee707a10f Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 5 Sep 2016 11:08:28 +0300 Subject: [PATCH 1/4] LocalCachedMap should implements RDestroyable. #592 --- .../org/redisson/RedissonLocalCachedMap.java | 32 ++++++++++++------- .../java/org/redisson/api/RDestroyable.java | 15 +++++++++ .../org/redisson/api/RLocalCachedMap.java | 2 +- 3 files changed, 37 insertions(+), 12 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RDestroyable.java diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 503085e21..8888af165 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -170,6 +170,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R private RMap map; private Cache cache; private int invalidateEntryOnChange; + private int invalidationListenerId; protected RedissonLocalCachedMap(RedissonClient redisson, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) { super(commandExecutor, name); @@ -198,18 +199,20 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R } invalidationTopic = redisson.getTopic(name + ":topic"); - invalidationTopic.addListener(new MessageListener() { - @Override - public void onMessage(String channel, Object msg) { - if (msg instanceof LocalCachedMapClear) { - cache.clear(); - } - if (msg instanceof LocalCachedMapInvalidate) { - CacheKey key = new CacheKey(((LocalCachedMapInvalidate)msg).getKeyHash()); - cache.remove(key); + if (options.isInvalidateEntryOnChange()) { + invalidationListenerId = invalidationTopic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, Object msg) { + if (msg instanceof LocalCachedMapClear) { + cache.clear(); + } + if (msg instanceof LocalCachedMapInvalidate) { + CacheKey key = new CacheKey(((LocalCachedMapInvalidate)msg).getKeyHash()); + cache.remove(key); + } } - } - }); + }); + } } @Override @@ -359,6 +362,13 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), encodedKey, encodedValue, msg, invalidateEntryOnChange); } + + @Override + public void destroy() { + if (invalidationListenerId != 0) { + invalidationTopic.removeListener(invalidationListenerId); + } + } @Override public V remove(Object key) { diff --git a/redisson/src/main/java/org/redisson/api/RDestroyable.java b/redisson/src/main/java/org/redisson/api/RDestroyable.java new file mode 100644 index 000000000..27a64f39c --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RDestroyable.java @@ -0,0 +1,15 @@ +package org.redisson.api; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RDestroyable { + + /** + * Allows to destroy object then it's not necessary anymore. + */ + void destroy(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java b/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java index 6cfab311b..8a99bb3b3 100644 --- a/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java @@ -28,7 +28,7 @@ import java.util.Map; * @param * @param */ -public interface RLocalCachedMap extends Map, RExpirable, RLocalCachedMapAsync { +public interface RLocalCachedMap extends Map, RExpirable, RLocalCachedMapAsync, RDestroyable { /** * Associates the specified value with the specified key. From cd4c7103556de570fe47f2dc7c60384fe0eaf972 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 5 Sep 2016 12:57:25 +0300 Subject: [PATCH 2/4] license header added --- .../main/java/org/redisson/api/RDestroyable.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/redisson/src/main/java/org/redisson/api/RDestroyable.java b/redisson/src/main/java/org/redisson/api/RDestroyable.java index 27a64f39c..3d9615a21 100644 --- a/redisson/src/main/java/org/redisson/api/RDestroyable.java +++ b/redisson/src/main/java/org/redisson/api/RDestroyable.java @@ -1,3 +1,18 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.api; /** From 0e5e4014f765ad88c3e0d27ef4d89abcc227bff9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 5 Sep 2016 12:58:39 +0300 Subject: [PATCH 3/4] Redisson node could be created with existing Redisson instance --- .../main/java/org/redisson/RedissonNode.java | 32 ++++++++++++++++--- .../redisson/api/RedissonNodeInitializer.java | 3 +- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index 7585c5f34..09dc3ecf3 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -46,15 +46,22 @@ public class RedissonNode { private static final Logger log = LoggerFactory.getLogger(RedissonNode.class); private ExecutorService executor; + private boolean hasRedissonInstance; private RedissonClient redisson; private final RedissonNodeConfig config; private final String id; private InetSocketAddress remoteAddress; private InetSocketAddress localAddress; - private RedissonNode(RedissonNodeConfig config) { + private RedissonNode(RedissonNodeConfig config, Redisson redisson) { this.config = new RedissonNodeConfig(config); this.id = generateId(); + this.redisson = redisson; + hasRedissonInstance = redisson == null; + } + + public RedissonClient getRedisson() { + return redisson; } public InetSocketAddress getLocalAddress() { @@ -122,7 +129,9 @@ public class RedissonNode { Thread.currentThread().interrupt(); } } - redisson.shutdown(); + if (hasRedissonInstance) { + redisson.shutdown(); + } log.info("Redisson node has been shutdown successfully"); } @@ -136,12 +145,14 @@ public class RedissonNode { executor = Executors.newFixedThreadPool(config.getExecutorServiceThreads(), new RedissonThreadFactory()); } - redisson = Redisson.create(config); + if (hasRedissonInstance) { + redisson = Redisson.create(config); + } retrieveAdresses(); if (config.getRedissonNodeInitializer() != null) { - config.getRedissonNodeInitializer().onStartup(redisson, this); + config.getRedissonNodeInitializer().onStartup(this); } for (Entry entry : config.getExecutorServiceWorkers().entrySet()) { @@ -185,11 +196,22 @@ public class RedissonNode { * @return RedissonNode instance */ public static RedissonNode create(RedissonNodeConfig config) { + return create(config, null); + } + + /** + * Create Redisson node instance with provided config and Redisson instance + * + * @param config + * @param redisson + * @return RedissonNode instance + */ + public static RedissonNode create(RedissonNodeConfig config, Redisson redisson) { if (config.getExecutorServiceWorkers().isEmpty()) { throw new IllegalArgumentException("Executor service workers are empty"); } - return new RedissonNode(config); + return new RedissonNode(config, redisson); } } diff --git a/redisson/src/main/java/org/redisson/api/RedissonNodeInitializer.java b/redisson/src/main/java/org/redisson/api/RedissonNodeInitializer.java index b6ce2e5a6..4720be52c 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonNodeInitializer.java +++ b/redisson/src/main/java/org/redisson/api/RedissonNodeInitializer.java @@ -27,9 +27,8 @@ public interface RedissonNodeInitializer { /** * Invoked during Redisson Node startup * - * @param redisson * @param redissonNode */ - void onStartup(RedissonClient redisson, RedissonNode redissonNode); + void onStartup(RedissonNode redissonNode); } From 8943f0655d4a05e19e3aac844e031f0f2a0854e0 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 6 Sep 2016 17:54:58 +0300 Subject: [PATCH 4/4] RSemaphore.reducePermits method added. #603 --- .../java/org/redisson/RedissonSemaphore.java | 74 ++++++++++++++++--- .../org/redisson/RedissonSemaphoreTest.java | 45 ++++++++--- 2 files changed, 97 insertions(+), 22 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 000eff669..6ab2dc169 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -135,6 +135,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } private void tryAcquireAsync(final AtomicLong time, final int permits, final RFuture subscribeFuture, final RPromise result) { + if (result.isDone()) { + unsubscribe(subscribeFuture); + return; + } + RFuture tryAcquireFuture = tryAcquireAsync(permits); tryAcquireFuture.addListener(new FutureListener() { @Override @@ -144,10 +149,12 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { result.tryFailure(future.cause()); return; } - + if (future.getNow()) { unsubscribe(subscribeFuture); - result.trySuccess(true); + if (!result.trySuccess(true)) { + releaseAsync(permits); + } return; } @@ -170,8 +177,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void run() { executed.set(true); - if (futureRef.get() != null) { - futureRef.get().cancel(); + if (futureRef.get() != null && !futureRef.get().cancel()) { + entry.getLatch().release(); + return; } long elapsed = System.currentTimeMillis() - current; time.addAndGet(-elapsed); @@ -206,19 +214,26 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { } private void acquireAsync(final int permits, final RFuture subscribeFuture, final RPromise result) { + if (result.isDone()) { + unsubscribe(subscribeFuture); + return; + } + RFuture tryAcquireFuture = tryAcquireAsync(permits); tryAcquireFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { unsubscribe(subscribeFuture); - result.setFailure(future.cause()); + result.tryFailure(future.cause()); return; } if (future.getNow()) { unsubscribe(subscribeFuture); - result.setSuccess(null); + if (!result.trySuccess(null)) { + releaseAsync(permits); + } return; } @@ -321,12 +336,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.setFailure(future.cause()); + result.tryFailure(future.cause()); return; } if (future.getNow()) { - result.setSuccess(true); + if (!result.trySuccess(true)) { + releaseAsync(permits); + } return; } @@ -337,14 +354,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.setFailure(future.cause()); + result.tryFailure(future.cause()); return; } if (futureRef.get() != null) { futureRef.get().cancel(); } - + long elapsed = System.currentTimeMillis() - current; time.addAndGet(-elapsed); @@ -455,5 +472,42 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { + "end;", Arrays.asList(getName(), getChannelName()), permits); } + + @Override + public boolean trySetPermits(int permits) { + return get(trySetPermitsAsync(permits)); + } + + @Override + public RFuture trySetPermitsAsync(int permits) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false or value == 0) then " + + "redis.call('set', KEYS[1], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(getName(), getChannelName()), permits); + } + @Override + public void reducePermits(int permits) { + get(reducePermitsAsync(permits)); + } + + @Override + public RFuture reducePermitsAsync(int permits) { + if (permits < 0) { + throw new IllegalArgumentException(); + } + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false) then " + + "value = 0;" + + "end;" + + "redis.call('set', KEYS[1], value - ARGV[1]); ", + Arrays.asList(getName(), getChannelName()), permits); + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java index 391234f8c..dfccf562f 100644 --- a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -3,22 +3,43 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.junit.Assume; +import org.junit.Assume; import org.junit.Test; import org.redisson.api.RSemaphore; public class RedissonSemaphoreTest extends BaseConcurrentTest { + @Test + public void testTrySetPermits() { + RSemaphore s = redisson.getSemaphore("test"); + assertThat(s.trySetPermits(10)).isTrue(); + assertThat(s.availablePermits()).isEqualTo(10); + assertThat(s.trySetPermits(15)).isFalse(); + assertThat(s.availablePermits()).isEqualTo(10); + } + + @Test + public void testReducePermits() throws InterruptedException { + RSemaphore s = redisson.getSemaphore("test"); + s.trySetPermits(10); + + s.acquire(10); + s.reducePermits(5); + assertThat(s.availablePermits()).isEqualTo(-5); + s.release(10); + assertThat(s.availablePermits()).isEqualTo(5); + s.acquire(5); + assertThat(s.availablePermits()).isEqualTo(0); + } + @Test public void testBlockingAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); s.acquire(); Thread t = new Thread() { @@ -46,7 +67,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testBlockingNAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(5); + s.trySetPermits(5); s.acquire(3); Thread t = new Thread() { @@ -80,7 +101,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testTryNAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(5); + s.trySetPermits(5); assertThat(s.tryAcquire(3)).isTrue(); Thread t = new Thread() { @@ -126,7 +147,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testDrainPermits() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(10); + s.trySetPermits(10); s.acquire(3); assertThat(s.drainPermits()).isEqualTo(7); @@ -136,7 +157,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { @Test public void testReleaseAcquire() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(10); + s.trySetPermits(10); s.acquire(); assertThat(s.availablePermits()).isEqualTo(9); s.release(); @@ -153,7 +174,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); int iterations = 15; testSingleInstanceConcurrency(iterations, r -> { @@ -178,7 +199,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); testMultiInstanceConcurrency(16, r -> { for (int i = 0; i < iterations; i++) { @@ -208,7 +229,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(1); + s.trySetPermits(1); testMultiInstanceConcurrency(iterations, r -> { RSemaphore s1 = r.getSemaphore("test"); @@ -233,7 +254,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { final AtomicInteger lockedCounter = new AtomicInteger(); RSemaphore s = redisson.getSemaphore("test"); - s.setPermits(10); + s.trySetPermits(10); final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits()); final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());