Fixed - destroy() method doesn't remove listeners.

pull/6077/head
Nikita Koksharov
parent 15bd94ed55
commit 698cdb60e1

@ -534,6 +534,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public void destroy() {
commandExecutor.getServiceManager().getQueueTransferService().remove(queueName);
removeListeners();
}
}

@ -1728,6 +1728,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
if (writeBehindService != null) {
writeBehindService.stop(getRawName());
}
removeListeners();
}
@Override

@ -2896,6 +2896,21 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
res.completeExceptionally(new IllegalArgumentException("Wrong listener type " + listener.getClass()));
return res;
});
f = f.thenApply(id -> {
if (listener instanceof EntryRemovedListener) {
addListenerId(getRemovedChannelName(), id);
}
if (listener instanceof EntryUpdatedListener) {
addListenerId(getUpdatedChannelName(), id);
}
if (listener instanceof EntryCreatedListener) {
addListenerId(getCreatedChannelName(), id);
}
if (listener instanceof EntryExpiredListener) {
addListenerId(getExpiredChannelName(), id);
}
return id;
});
return new CompletableFutureWrapper<>(f);
}
@ -2903,22 +2918,33 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public void removeListener(int listenerId) {
super.removeListener(listenerId);
RTopic removedTopic = getTopic(getRemovedChannelName());
removedTopic.removeListener(listenerId);
RTopic createdTopic = getTopic(getCreatedChannelName());
createdTopic.removeListener(listenerId);
String topicName = getNameByListenerId(listenerId);
if (topicName != null) {
RTopic topic = getTopic(topicName);
removeListenerId(topicName, listenerId);
topic.removeListener(listenerId);
}
}
RTopic updatedTopic = getTopic(getUpdatedChannelName());
updatedTopic.removeListener(listenerId);
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
CompletionStage<Void> r = super.removeListenerAsync(listenerId);
r = r.thenCompose(v -> {
String topicName = getNameByListenerId(listenerId);
if (topicName != null) {
RTopic topic = getTopic(topicName);
removeListenerId(topicName, listenerId);
return topic.removeListenerAsync(listenerId);
}
return CompletableFuture.completedFuture(null);
});
RTopic expiredTopic = getTopic(getExpiredChannelName());
expiredTopic.removeListener(listenerId);
return new CompletableFutureWrapper<>(r);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName);
List<Object> keys = Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName);
return super.sizeInMemoryAsync(keys);
}
@ -3253,5 +3279,19 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
evictionScheduler.remove(getRawName());
}
super.destroy();
List<String> channels = Arrays.asList(getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), getExpiredChannelName());
for (String channel : channels) {
Collection<Integer> ids = getListenerIdsByName(channel);
if (ids.isEmpty()) {
continue;
}
RTopic topic = getTopic(channel);
for (Integer listenerId : ids) {
removeListenerId(channel, listenerId);
topic.removeListener(listenerId);
}
}
}
}

@ -36,6 +36,7 @@ import org.redisson.pubsub.PublishSubscribeService;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -53,6 +54,7 @@ public abstract class RedissonObject implements RObject {
protected CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
private final Map<String, Collection<Integer>> listeners = new ConcurrentHashMap<>();
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = commandExecutor.getServiceManager().getCodec(codec);
@ -536,9 +538,33 @@ public abstract class RedissonObject implements RObject {
for (String name : names) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name);
topic.removeListener(listenerId);
removeListenerId(name, listenerId);
}
}
protected final Collection<Integer> getListenerIdsByName(String name) {
return listeners.getOrDefault(name, Collections.emptyList());
}
protected final String getNameByListenerId(int listenerId) {
for (Map.Entry<String, Collection<Integer>> entry : listeners.entrySet()) {
if (entry.getValue().contains(listenerId)) {
return entry.getKey();
}
}
return null;
}
protected final void removeListenerId(String name, int listenerId) {
listeners.computeIfPresent(name, (k, ids) -> {
ids.remove(listenerId);
if (ids.isEmpty()) {
return null;
}
return ids;
});
}
protected final RFuture<Void> removeListenerAsync(RFuture<Void> future, int listenerId, String... names) {
List<CompletableFuture<Void>> futures = new ArrayList<>(names.length + 1);
if (future != null) {
@ -548,6 +574,7 @@ public abstract class RedissonObject implements RObject {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name);
RFuture<Void> f1 = topic.removeListenerAsync(listenerId);
futures.add(f1.toCompletableFuture());
removeListenerId(name, listenerId);
}
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
@ -571,20 +598,40 @@ public abstract class RedissonObject implements RObject {
protected <T extends ObjectListener> int addListener(String name, T listener, BiConsumer<T, String> consumer) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name);
return topic.addListener(String.class, (pattern, channel, msg) -> {
int id = topic.addListener(String.class, (pattern, channel, msg) -> {
if (msg.equals(getRawName())) {
consumer.accept(listener, msg);
}
});
addListenerId(name, id);
return id;
}
protected <T extends ObjectListener> RFuture<Integer> addListenerAsync(String name, T listener, BiConsumer<T, String> consumer) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, name);
return topic.addListenerAsync(String.class, (pattern, channel, msg) -> {
RFuture<Integer> f = topic.addListenerAsync(String.class, (pattern, channel, msg) -> {
if (msg.equals(getRawName())) {
consumer.accept(listener, msg);
}
});
CompletionStage<Integer> r = f.thenApply(id -> {
addListenerId(name, id);
return id;
});
return new CompletableFutureWrapper<>(r);
}
protected final void addListenerId(String name, Integer id) {
Collection<Integer> ids = listeners.computeIfAbsent(name, k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
ids.add(id);
}
protected final void removeListeners() {
for (Map.Entry<String, Collection<Integer>> entry : listeners.entrySet()) {
for (Integer id : entry.getValue()) {
removeListener(id, name);
}
}
}
@Override

@ -449,6 +449,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
if (evictionScheduler != null) {
evictionScheduler.remove(getRawName());
}
removeListeners();
}
@Override

@ -981,6 +981,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
if (evictionScheduler != null) {
evictionScheduler.remove(getRawName());
}
removeListeners();
}
@Override

@ -210,12 +210,31 @@ public class RedissonMapCacheTest extends BaseMapTest {
@Test
public void testDestroy() {
RMapCache<String, String> cache = redisson.getMapCache("test");
AtomicInteger counter = new AtomicInteger();
cache.addListener(new EntryCreatedListener<>() {
@Override
public void onCreated(EntryEvent<Object, Object> event) {
counter.incrementAndGet();
}
});
cache.fastPut("1", "2");
Awaitility.await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(counter.get()).isEqualTo(1));
EvictionScheduler evictionScheduler = ((Redisson)redisson).getEvictionScheduler();
Map<?, ?> map = Reflect.on(evictionScheduler).get("tasks");
assertThat(map.isEmpty()).isFalse();
cache.destroy();
assertThat(map.isEmpty()).isTrue();
RMapCache<String, String> cache2 = redisson.getMapCache("test");
cache2.fastPut("3", "4");
Awaitility.await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(2))
.untilAsserted(() -> assertThat(counter.get()).isEqualTo(1));
}
@Override

Loading…
Cancel
Save