From 01fc8f5f57c7d84026e4d802f3a74a96e96a3f52 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Feb 2017 17:45:37 +0300 Subject: [PATCH 1/2] RTopic.removeAllListeners method added. #752 --- .../main/java/org/redisson/RedissonTopic.java | 18 ++++++++++++++++++ .../src/main/java/org/redisson/api/RTopic.java | 5 +++++ .../connection/PubSubConnectionEntry.java | 8 ++++++++ .../java/org/redisson/RedissonTopicTest.java | 16 ++++++++++++++++ 4 files changed, 47 insertions(+) diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index bce6b1297..6faf4c193 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -83,6 +83,24 @@ public class RedissonTopic implements RTopic { return System.identityHashCode(pubSubListener); } + public void removeAllListeners() { + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + semaphore.release(); + return; + } + + entry.removeAllListeners(name); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + } else { + semaphore.release(); + } + } + @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); diff --git a/redisson/src/main/java/org/redisson/api/RTopic.java b/redisson/src/main/java/org/redisson/api/RTopic.java index 5cc2e2cec..868ba406f 100644 --- a/redisson/src/main/java/org/redisson/api/RTopic.java +++ b/redisson/src/main/java/org/redisson/api/RTopic.java @@ -71,4 +71,9 @@ public interface RTopic extends RTopicAsync { */ void removeListener(int listenerId); + /** + * Removes all listeners from this topic + */ + void removeAllListeners(); + } diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 88ad18529..9fe9da637 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -88,6 +88,14 @@ public class PubSubConnectionEntry { conn.addListener(listener); } + public boolean removeAllListeners(String channelName) { + Queue> listeners = channelListeners.get(channelName); + for (RedisPubSubListener listener : listeners) { + removeListener(channelName, listener); + } + return !listeners.isEmpty(); + } + // TODO optimize public boolean removeListener(String channelName, int listenerId) { Queue> listeners = channelListeners.get(channelName); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 6417786f6..3d9939602 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -318,6 +318,22 @@ public class RedissonTopicTest { redisson.shutdown(); } + @Test + public void testRemoveAllListeners() throws InterruptedException { + RedissonClient redisson = BaseTest.createInstance(); + RTopic topic1 = redisson.getTopic("topic1"); + for (int i = 0; i < 10; i++) { + topic1.addListener((channel, msg) -> { + Assert.fail(); + }); + } + + topic1 = redisson.getTopic("topic1"); + topic1.removeAllListeners(); + topic1.publish(new Message("123")); + + redisson.shutdown(); + } @Test public void testLazyUnsubscribe() throws InterruptedException { From fc484ee58ac143184fbeb99a1bb1bf83b9cfbe4b Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Feb 2017 17:47:13 +0300 Subject: [PATCH 2/2] Refactoring --- .../org/redisson/RedissonLocalCachedMap.java | 22 +++---------------- .../redisson/RedissonLocalCachedMapTest.java | 5 ++++- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index d6755959d..14ae25699 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -415,24 +415,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } - @Override - public void putAll(Map m) { - Map cacheMap = new HashMap(m.size()); - for (java.util.Map.Entry entry : m.entrySet()) { - CacheKey cacheKey = toCacheKey(entry.getKey()); - CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue()); - cacheMap.put(cacheKey, cacheValue); - } - cache.putAll(cacheMap); - super.putAll(m); - - if (invalidateEntryOnChange == 1) { - for (CacheKey cacheKey : cacheMap.keySet()) { - invalidationTopic.publish(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); - } - } - } - @Override public RFuture deleteAsync() { cache.clear(); @@ -754,6 +736,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } params.addAll(msgs); + final RPromise result = newPromise(); RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, "redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" + "if ARGV[1] == '1' then " @@ -771,9 +754,10 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } cacheMap(map); + result.trySuccess(null); } }); - return future; + return result; } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 8bb73c6bb..9a8044c6b 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -323,7 +323,7 @@ public class RedissonLocalCachedMapTest extends BaseTest { } @Test - public void testPutAll() { + public void testPutAll() throws InterruptedException { Map map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); Map map1 = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); Cache cache = Deencapsulation.getField(map, "cache"); @@ -344,6 +344,9 @@ public class RedissonLocalCachedMapTest extends BaseTest { map1.putAll(joinMap); + // waiting for cache cleanup listeners triggering + Thread.sleep(500); + assertThat(cache.size()).isEqualTo(3); assertThat(cache1.size()).isEqualTo(3); }