Fixed - RAtomicDouble, RAtomicLong, RMap, RScoredSortedSet, RSet listeners aren't deleted properly.

pull/5564/head
Nikita Koksharov 1 year ago
parent 343dc63312
commit 71c6a21318

@ -191,4 +191,15 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo
return super.addListenerAsync(listener);
}
@Override
public void removeListener(int listenerId) {
removeListener(listenerId, "__keyevent@*:incrby");
super.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, "__keyevent@*:incrby");
}
}

@ -188,4 +188,16 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
}
return super.addListenerAsync(listener);
}
@Override
public void removeListener(int listenerId) {
removeListener(listenerId, "__keyevent@*:incrby");
super.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, "__keyevent@*:incrby");
}
}

@ -1912,5 +1912,15 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return super.addListenerAsync(listener);
}
@Override
public void removeListener(int listenerId) {
removeListener(listenerId, "__keyevent@*:hset", "__keyevent@*:hdel");
super.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, "__keyevent@*:hset", "__keyevent@*:hdel");
}
}

@ -34,7 +34,6 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ -2066,19 +2065,14 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public void removeListener(int listenerId) {
RPatternTopic expiredTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd");
expiredTopic.removeListener(listenerId);
removeListener(listenerId, "__keyevent@*:zadd", "__keyevent@*:zrem");
super.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd");
RFuture<Void> f1 = setTopic.removeListenerAsync(listenerId);
RFuture<Void> f2 = super.removeListenerAsync(listenerId);
CompletableFuture<Void> f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture());
return new CompletableFutureWrapper<>(f);
return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId,
"__keyevent@*:zadd", "__keyevent@*:zrem");
}
}

@ -896,5 +896,17 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return super.addListenerAsync(listener);
}
@Override
public void removeListener(int listenerId) {
removeListener(listenerId, "__keyevent@*:sadd", "__keyevent@*:srem", "__keyevent@*:spop");
super.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId,
"__keyevent@*:sadd", "__keyevent@*:srem", "__keyevent@*:spop");
}
}

@ -1,5 +1,6 @@
package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.*;
@ -17,6 +18,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -1767,20 +1769,30 @@ public class RedissonScoredSortedSetTest extends RedisDockerTest {
public void testAddListener() {
testWithParams(redisson -> {
RScoredSortedSet<Integer> ss = redisson.getScoredSortedSet("test");
CountDownLatch latch = new CountDownLatch(1);
ss.addListener(new ScoredSortedSetAddListener() {
AtomicInteger latch = new AtomicInteger();
int id = ss.addListener(new ScoredSortedSetAddListener() {
@Override
public void onAdd(String name) {
latch.countDown();
latch.incrementAndGet();
}
});
ss.add(1, 1);
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(latch.get()).isEqualTo(1);
});
ss.removeListener(id);
ss.add(1, 1);
try {
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(latch.get()).isEqualTo(1);
}, NOTIFY_KEYSPACE_EVENTS, "Ez");
}

Loading…
Cancel
Save