From 71c6a2131837d32ea150cc4b3256c0f27eb79fef Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 6 Jan 2024 12:37:13 +0300 Subject: [PATCH] Fixed - RAtomicDouble, RAtomicLong, RMap, RScoredSortedSet, RSet listeners aren't deleted properly. --- .../org/redisson/RedissonAtomicDouble.java | 11 ++++++++++ .../java/org/redisson/RedissonAtomicLong.java | 12 +++++++++++ .../main/java/org/redisson/RedissonMap.java | 10 ++++++++++ .../org/redisson/RedissonScoredSortedSet.java | 12 +++-------- .../main/java/org/redisson/RedissonSet.java | 12 +++++++++++ .../redisson/RedissonScoredSortedSetTest.java | 20 +++++++++++++++---- 6 files changed, 64 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java index efd9586d1..660eeb0b5 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java @@ -191,4 +191,15 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo return super.addListenerAsync(listener); } + @Override + public void removeListener(int listenerId) { + removeListener(listenerId, "__keyevent@*:incrby"); + super.removeListener(listenerId); + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, "__keyevent@*:incrby"); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicLong.java b/redisson/src/main/java/org/redisson/RedissonAtomicLong.java index 18ae0b12b..931de4838 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicLong.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicLong.java @@ -188,4 +188,16 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong } return super.addListenerAsync(listener); } + + @Override + public void removeListener(int listenerId) { + removeListener(listenerId, "__keyevent@*:incrby"); + super.removeListener(listenerId); + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, "__keyevent@*:incrby"); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index e0760dbd9..4fbd012d8 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -1912,5 +1912,15 @@ public class RedissonMap extends RedissonExpirable implements RMap { return super.addListenerAsync(listener); } + @Override + public void removeListener(int listenerId) { + removeListener(listenerId, "__keyevent@*:hset", "__keyevent@*:hdel"); + super.removeListener(listenerId); + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, "__keyevent@*:hset", "__keyevent@*:hdel"); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index e62583498..69e47af5b 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -34,7 +34,6 @@ import java.math.BigDecimal; import java.time.Duration; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Stream; @@ -2066,19 +2065,14 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc @Override public void removeListener(int listenerId) { - RPatternTopic expiredTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd"); - expiredTopic.removeListener(listenerId); - + removeListener(listenerId, "__keyevent@*:zadd", "__keyevent@*:zrem"); super.removeListener(listenerId); } @Override public RFuture removeListenerAsync(int listenerId) { - RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd"); - RFuture f1 = setTopic.removeListenerAsync(listenerId); - RFuture f2 = super.removeListenerAsync(listenerId); - CompletableFuture f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture()); - return new CompletableFutureWrapper<>(f); + return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, + "__keyevent@*:zadd", "__keyevent@*:zrem"); } } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 7d3009be3..e5b6f1625 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -896,5 +896,17 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return super.addListenerAsync(listener); } + @Override + public void removeListener(int listenerId) { + removeListener(listenerId, "__keyevent@*:sadd", "__keyevent@*:srem", "__keyevent@*:spop"); + super.removeListener(listenerId); + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + return removeListenerAsync(super.removeListenerAsync(listenerId), listenerId, + "__keyevent@*:sadd", "__keyevent@*:srem", "__keyevent@*:spop"); + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index c986d4646..910c85760 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -1,5 +1,6 @@ package org.redisson; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.*; @@ -17,6 +18,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1767,20 +1769,30 @@ public class RedissonScoredSortedSetTest extends RedisDockerTest { public void testAddListener() { testWithParams(redisson -> { RScoredSortedSet ss = redisson.getScoredSortedSet("test"); - CountDownLatch latch = new CountDownLatch(1); - ss.addListener(new ScoredSortedSetAddListener() { + AtomicInteger latch = new AtomicInteger(); + int id = ss.addListener(new ScoredSortedSetAddListener() { @Override public void onAdd(String name) { - latch.countDown(); + latch.incrementAndGet(); } }); ss.add(1, 1); + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { + assertThat(latch.get()).isEqualTo(1); + }); + + ss.removeListener(id); + + ss.add(1, 1); + try { - assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } + + assertThat(latch.get()).isEqualTo(1); }, NOTIFY_KEYSPACE_EVENTS, "Ez"); }