From 698cdb60e17e94213e345689338c8d0ae2f3e6f3 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 7 Aug 2024 14:02:00 +0300 Subject: [PATCH] Fixed - destroy() method doesn't remove listeners. #6005 --- .../org/redisson/RedissonDelayedQueue.java | 1 + .../main/java/org/redisson/RedissonMap.java | 1 + .../java/org/redisson/RedissonMapCache.java | 60 +++++++++++++++---- .../java/org/redisson/RedissonObject.java | 51 +++++++++++++++- .../java/org/redisson/RedissonSetCache.java | 1 + .../java/org/redisson/RedissonTimeSeries.java | 1 + .../org/redisson/RedissonMapCacheTest.java | 21 ++++++- 7 files changed, 123 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index b7589f3ff..8b058d8bc 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -534,6 +534,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public void destroy() { commandExecutor.getServiceManager().getQueueTransferService().remove(queueName); + removeListeners(); } } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 17d16662b..66477582a 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -1728,6 +1728,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { if (writeBehindService != null) { writeBehindService.stop(getRawName()); } + removeListeners(); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index c6fedd331..f71adb0a6 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -2896,6 +2896,21 @@ public class RedissonMapCache extends RedissonMap implements RMapCac res.completeExceptionally(new IllegalArgumentException("Wrong listener type " + listener.getClass())); return res; }); + f = f.thenApply(id -> { + if (listener instanceof EntryRemovedListener) { + addListenerId(getRemovedChannelName(), id); + } + if (listener instanceof EntryUpdatedListener) { + addListenerId(getUpdatedChannelName(), id); + } + if (listener instanceof EntryCreatedListener) { + addListenerId(getCreatedChannelName(), id); + } + if (listener instanceof EntryExpiredListener) { + addListenerId(getExpiredChannelName(), id); + } + return id; + }); return new CompletableFutureWrapper<>(f); } @@ -2903,22 +2918,33 @@ public class RedissonMapCache extends RedissonMap implements RMapCac public void removeListener(int listenerId) { super.removeListener(listenerId); - RTopic removedTopic = getTopic(getRemovedChannelName()); - removedTopic.removeListener(listenerId); - - RTopic createdTopic = getTopic(getCreatedChannelName()); - createdTopic.removeListener(listenerId); + String topicName = getNameByListenerId(listenerId); + if (topicName != null) { + RTopic topic = getTopic(topicName); + removeListenerId(topicName, listenerId); + topic.removeListener(listenerId); + } + } - RTopic updatedTopic = getTopic(getUpdatedChannelName()); - updatedTopic.removeListener(listenerId); + @Override + public RFuture removeListenerAsync(int listenerId) { + CompletionStage r = super.removeListenerAsync(listenerId); + r = r.thenCompose(v -> { + String topicName = getNameByListenerId(listenerId); + if (topicName != null) { + RTopic topic = getTopic(topicName); + removeListenerId(topicName, listenerId); + return topic.removeListenerAsync(listenerId); + } + return CompletableFuture.completedFuture(null); + }); - RTopic expiredTopic = getTopic(getExpiredChannelName()); - expiredTopic.removeListener(listenerId); + return new CompletableFutureWrapper<>(r); } @Override public RFuture sizeInMemoryAsync() { - List keys = Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName); + List keys = Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName); return super.sizeInMemoryAsync(keys); } @@ -3253,5 +3279,19 @@ public class RedissonMapCache extends RedissonMap implements RMapCac evictionScheduler.remove(getRawName()); } super.destroy(); + + List channels = Arrays.asList(getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), getExpiredChannelName()); + for (String channel : channels) { + Collection ids = getListenerIdsByName(channel); + if (ids.isEmpty()) { + continue; + } + + RTopic topic = getTopic(channel); + for (Integer listenerId : ids) { + removeListenerId(channel, listenerId); + topic.removeListener(listenerId); + } + } } } diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 3e5662b72..ea946a5d0 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -36,6 +36,7 @@ import org.redisson.pubsub.PublishSubscribeService; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -53,6 +54,7 @@ public abstract class RedissonObject implements RObject { protected CommandAsyncExecutor commandExecutor; protected String name; protected final Codec codec; + private final Map> listeners = new ConcurrentHashMap<>(); public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.codec = commandExecutor.getServiceManager().getCodec(codec); @@ -536,9 +538,33 @@ public abstract class RedissonObject implements RObject { for (String name : names) { RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name); topic.removeListener(listenerId); + removeListenerId(name, listenerId); } } + protected final Collection getListenerIdsByName(String name) { + return listeners.getOrDefault(name, Collections.emptyList()); + } + + protected final String getNameByListenerId(int listenerId) { + for (Map.Entry> entry : listeners.entrySet()) { + if (entry.getValue().contains(listenerId)) { + return entry.getKey(); + } + } + return null; + } + + protected final void removeListenerId(String name, int listenerId) { + listeners.computeIfPresent(name, (k, ids) -> { + ids.remove(listenerId); + if (ids.isEmpty()) { + return null; + } + return ids; + }); + } + protected final RFuture removeListenerAsync(RFuture future, int listenerId, String... names) { List> futures = new ArrayList<>(names.length + 1); if (future != null) { @@ -548,6 +574,7 @@ public abstract class RedissonObject implements RObject { RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name); RFuture f1 = topic.removeListenerAsync(listenerId); futures.add(f1.toCompletableFuture()); + removeListenerId(name, listenerId); } CompletableFuture f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); return new CompletableFutureWrapper<>(f); @@ -571,20 +598,40 @@ public abstract class RedissonObject implements RObject { protected int addListener(String name, T listener, BiConsumer consumer) { RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name); - return topic.addListener(String.class, (pattern, channel, msg) -> { + int id = topic.addListener(String.class, (pattern, channel, msg) -> { if (msg.equals(getRawName())) { consumer.accept(listener, msg); } }); + addListenerId(name, id); + return id; } protected RFuture addListenerAsync(String name, T listener, BiConsumer consumer) { RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name); - return topic.addListenerAsync(String.class, (pattern, channel, msg) -> { + RFuture f = topic.addListenerAsync(String.class, (pattern, channel, msg) -> { if (msg.equals(getRawName())) { consumer.accept(listener, msg); } }); + CompletionStage r = f.thenApply(id -> { + addListenerId(name, id); + return id; + }); + return new CompletableFutureWrapper<>(r); + } + + protected final void addListenerId(String name, Integer id) { + Collection ids = listeners.computeIfAbsent(name, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())); + ids.add(id); + } + + protected final void removeListeners() { + for (Map.Entry> entry : listeners.entrySet()) { + for (Integer id : entry.getValue()) { + removeListener(id, name); + } + } } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 561a69e0e..4b7080371 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -449,6 +449,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< if (evictionScheduler != null) { evictionScheduler.remove(getRawName()); } + removeListeners(); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java index ef3817bd9..a74cc5985 100644 --- a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java +++ b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java @@ -981,6 +981,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime if (evictionScheduler != null) { evictionScheduler.remove(getRawName()); } + removeListeners(); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 2c4354f61..3802e0e42 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -210,12 +210,31 @@ public class RedissonMapCacheTest extends BaseMapTest { @Test public void testDestroy() { RMapCache cache = redisson.getMapCache("test"); - + AtomicInteger counter = new AtomicInteger(); + cache.addListener(new EntryCreatedListener<>() { + @Override + public void onCreated(EntryEvent event) { + counter.incrementAndGet(); + } + }); + + cache.fastPut("1", "2"); + + Awaitility.await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertThat(counter.get()).isEqualTo(1)); + EvictionScheduler evictionScheduler = ((Redisson)redisson).getEvictionScheduler(); Map map = Reflect.on(evictionScheduler).get("tasks"); assertThat(map.isEmpty()).isFalse(); cache.destroy(); assertThat(map.isEmpty()).isTrue(); + + RMapCache cache2 = redisson.getMapCache("test"); + cache2.fastPut("3", "4"); + + Awaitility.await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> assertThat(counter.get()).isEqualTo(1)); + } @Override