From 59d756f22149c9a17cbfc98e3ba11623064a49ba Mon Sep 17 00:00:00 2001 From: Nikita Koksharov <nkoksharov@redisson.pro> Date: Mon, 23 Dec 2024 14:46:18 +0300 Subject: [PATCH] Fixed - destroy() method doesn't work if called immediately after creation of RDelayedQueue object. #5537 --- .../java/org/redisson/QueueTransferTask.java | 19 ++- .../redisson/RedissonDelayedQueueTest.java | 127 ++++++++++-------- 2 files changed, 88 insertions(+), 58 deletions(-) diff --git a/redisson/src/main/java/org/redisson/QueueTransferTask.java b/redisson/src/main/java/org/redisson/QueueTransferTask.java index a68960bd7..2dd7e5df2 100644 --- a/redisson/src/main/java/org/redisson/QueueTransferTask.java +++ b/redisson/src/main/java/org/redisson/QueueTransferTask.java @@ -58,7 +58,7 @@ public abstract class QueueTransferTask { } - private int usage = 1; + private volatile int usage = 1; private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>(); private final ServiceManager serviceManager; @@ -99,14 +99,23 @@ public abstract class QueueTransferTask { public void stop() { RTopic schedulerTopic = getTopic(); schedulerTopic.removeListener(messageListenerId, statusListenerId); + + TimeoutTask oldTimeout = lastTimeout.get(); + if (oldTimeout != null) { + oldTimeout.getTask().cancel(); + } } private void scheduleTask(final Long startTime) { - TimeoutTask oldTimeout = lastTimeout.get(); + if (usage == 0) { + return; + } + if (startTime == null) { return; } - + + TimeoutTask oldTimeout = lastTimeout.get(); if (oldTimeout != null) { oldTimeout.getTask().cancel(); } @@ -137,6 +146,10 @@ public abstract class QueueTransferTask { protected abstract RFuture<Long> pushTaskAsync(); private void pushTask() { + if (usage == 0) { + return; + } + RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.whenComplete((res, e) -> { if (e != null) { diff --git a/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java b/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java index 2afd044f1..52d6e322d 100644 --- a/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java @@ -1,6 +1,7 @@ package org.redisson; import org.junit.jupiter.api.Test; +import org.redisson.api.RBlockingDeque; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RQueue; @@ -12,11 +13,27 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonDelayedQueueTest extends RedisDockerTest { + @Test + public void testDestroy() throws InterruptedException { + RBlockingDeque<Integer> blockingDeque = redisson.getBlockingDeque("test"); + RDelayedQueue<Integer> delayedQueue = redisson.getDelayedQueue(blockingDeque); + delayedQueue.offer(1, 2, TimeUnit.SECONDS); + delayedQueue.destroy(); + + RDelayedQueue<Integer> delayedQueue2 = redisson.getDelayedQueue(blockingDeque); + delayedQueue2.destroy(); + + Thread.sleep(1000); + + Object s = blockingDeque.poll(3, TimeUnit.SECONDS); + assertThat(s).isNull(); + } + @Test public void testRemove() throws InterruptedException { RBlockingQueue<String> blockingFairQueue = redisson.getBlockingQueue("delay_queue"); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue); - + delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS); delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS); assertThat(delayedQueue.contains("1_1_1")).isTrue(); @@ -24,15 +41,15 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { assertThat(delayedQueue.contains("1_1_1")).isFalse(); Thread.sleep(9000); - + assertThat(blockingFairQueue).containsOnly("1_1_2"); } - + @Test public void testRemoveAll() throws InterruptedException { RBlockingQueue<String> blockingFairQueue = redisson.getBlockingQueue("delay_queue"); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue); - + delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS); delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS); assertThat(delayedQueue.contains("1_1_1")).isTrue(); @@ -40,13 +57,13 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { assertThat(delayedQueue.removeAll(Arrays.asList("1_1_1", "1_1_2"))).isTrue(); assertThat(delayedQueue.contains("1_1_1")).isFalse(); assertThat(delayedQueue.contains("1_1_2")).isFalse(); - + Thread.sleep(9000); - + assertThat(blockingFairQueue.isEmpty()).isTrue(); } - + @Test public void testDealyedQueueRetainAll() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); @@ -58,13 +75,13 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { assertThat(dealyedQueue.retainAll(Arrays.asList(1, 2, 3))).isFalse(); assertThat(dealyedQueue.retainAll(Arrays.asList(3, 1, 2, 8))).isFalse(); assertThat(dealyedQueue.readAll()).containsExactly(3, 1, 2); - + assertThat(dealyedQueue.retainAll(Arrays.asList(1, 2))).isTrue(); assertThat(dealyedQueue.readAll()).containsExactly(1, 2); - + dealyedQueue.destroy(); } - + @Test public void testDealyedQueueReadAll() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); @@ -74,10 +91,10 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { dealyedQueue.offer(2, 1, TimeUnit.SECONDS); assertThat(dealyedQueue.readAll()).containsExactly(3, 1, 2); - + dealyedQueue.destroy(); } - + @Test public void testDealyedQueueRemoveAll() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); @@ -85,24 +102,24 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS); - + assertThat(dealyedQueue.removeAll(Arrays.asList(1, 2))).isTrue(); assertThat(dealyedQueue).containsExactly(3); assertThat(dealyedQueue.removeAll(Arrays.asList(3, 4))).isTrue(); assertThat(dealyedQueue).isEmpty(); - + dealyedQueue.destroy(); } - + @Test public void testDealyedQueueContainsAll() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); - + dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS); - + assertThat(dealyedQueue.containsAll(Arrays.asList(1, 2))).isTrue(); assertThat(dealyedQueue.containsAll(Arrays.asList(1, 2, 4))).isFalse(); @@ -111,19 +128,19 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { dealyedQueue.destroy(); } - + @Test public void testDealyedQueueContains() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); - + dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS); - + assertThat(dealyedQueue.contains(1)).isTrue(); assertThat(dealyedQueue.contains(4)).isFalse(); - + dealyedQueue.destroy(); } @@ -131,37 +148,37 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { public void testDealyedQueueRemove() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); - + dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS); - + assertThat(dealyedQueue.remove(4)).isFalse(); assertThat(dealyedQueue.remove(3)).isTrue(); assertThat(dealyedQueue).containsExactly(1, 2); - + dealyedQueue.destroy(); } - + @Test public void testDealyedQueuePeek() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); - + dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS); - + assertThat(dealyedQueue.peek()).isEqualTo(3); - + dealyedQueue.destroy(); } - + @Test public void testDealyedQueuePollLastAndOfferFirstTo() { RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); - + dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(2, 2, TimeUnit.SECONDS); dealyedQueue.offer(1, 1, TimeUnit.SECONDS); @@ -173,31 +190,31 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { assertThat(dealyedQueue.pollLastAndOfferFirstTo(queue2.getName())).isEqualTo(1); assertThat(queue2).containsExactly(1, 6, 5, 4); - + dealyedQueue.destroy(); } - + @Test public void testDelayedQueueOrder() { RBlockingQueue<String> queue = redisson.getBlockingQueue("test"); RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue); - + dealyedQueue.offer("1", 1, TimeUnit.SECONDS); dealyedQueue.offer("4", 4, TimeUnit.SECONDS); dealyedQueue.offer("3", 3, TimeUnit.SECONDS); dealyedQueue.offer("2", 2, TimeUnit.SECONDS); - + assertThat(dealyedQueue).containsExactly("1", "4", "3", "2"); - + assertThat(dealyedQueue.poll()).isEqualTo("1"); assertThat(dealyedQueue.poll()).isEqualTo("4"); assertThat(dealyedQueue.poll()).isEqualTo("3"); assertThat(dealyedQueue.poll()).isEqualTo("2"); - + assertThat(queue.isEmpty()).isTrue(); - + assertThat(queue.poll()).isNull(); - + dealyedQueue.destroy(); } @@ -224,57 +241,57 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { dealyedQueue.destroy(); } - + @Test public void testPoll() throws InterruptedException { RBlockingQueue<String> queue = redisson.getBlockingQueue("test"); RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue); - + dealyedQueue.offer("1", 1, TimeUnit.SECONDS); dealyedQueue.offer("2", 2, TimeUnit.SECONDS); dealyedQueue.offer("3", 3, TimeUnit.SECONDS); dealyedQueue.offer("4", 4, TimeUnit.SECONDS); - + assertThat(dealyedQueue.poll()).isEqualTo("1"); assertThat(dealyedQueue.poll()).isEqualTo("2"); assertThat(dealyedQueue.poll()).isEqualTo("3"); assertThat(dealyedQueue.poll()).isEqualTo("4"); - + Thread.sleep(3000); assertThat(queue.isEmpty()).isTrue(); - + assertThat(queue.poll()).isNull(); assertThat(queue.poll()).isNull(); - + dealyedQueue.destroy(); } - + @Test public void testDealyedQueue() throws InterruptedException { RBlockingQueue<String> queue = redisson.getBlockingQueue("test"); RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue); - + dealyedQueue.offer("1", 1, TimeUnit.SECONDS); dealyedQueue.offer("2", 5, TimeUnit.SECONDS); dealyedQueue.offer("4", 4, TimeUnit.SECONDS); dealyedQueue.offer("2", 2, TimeUnit.SECONDS); dealyedQueue.offer("3", 3, TimeUnit.SECONDS); - + assertThat(dealyedQueue).containsExactly("1", "2", "4", "2", "3"); - + Thread.sleep(500); assertThat(queue.isEmpty()).isTrue(); Thread.sleep(600); assertThat(queue).containsExactly("1"); assertThat(dealyedQueue).containsExactly("2", "4", "2", "3"); - + Thread.sleep(500); assertThat(queue).containsExactly("1"); Thread.sleep(500); assertThat(queue).containsExactly("1", "2"); assertThat(dealyedQueue).containsExactly("2", "4", "3"); - + Thread.sleep(500); assertThat(queue).containsExactly("1", "2"); @@ -287,7 +304,7 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { Thread.sleep(500); assertThat(queue).containsExactly("1", "2", "3", "4"); - + assertThat(dealyedQueue).containsExactly("2"); Thread.sleep(500); assertThat(queue).containsExactly("1", "2", "3", "4"); @@ -295,16 +312,16 @@ public class RedissonDelayedQueueTest extends RedisDockerTest { assertThat(queue).containsExactly("1", "2", "3", "4", "2"); assertThat(dealyedQueue).isEmpty(); - + assertThat(queue.poll()).isEqualTo("1"); assertThat(queue.poll()).isEqualTo("2"); assertThat(queue.poll()).isEqualTo("3"); assertThat(queue.poll()).isEqualTo("4"); assertThat(queue.poll()).isEqualTo("2"); - + dealyedQueue.destroy(); } - - + + }