Fixed - destroy() method doesn't work if called immediately after creation of RDelayedQueue object. #5537

pull/6356/head
Nikita Koksharov 2 months ago
parent a932de8013
commit 59d756f221

@ -58,7 +58,7 @@ public abstract class QueueTransferTask {
} }
private int usage = 1; private volatile int usage = 1;
private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>(); private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>();
private final ServiceManager serviceManager; private final ServiceManager serviceManager;
@ -99,14 +99,23 @@ public abstract class QueueTransferTask {
public void stop() { public void stop() {
RTopic schedulerTopic = getTopic(); RTopic schedulerTopic = getTopic();
schedulerTopic.removeListener(messageListenerId, statusListenerId); schedulerTopic.removeListener(messageListenerId, statusListenerId);
TimeoutTask oldTimeout = lastTimeout.get();
if (oldTimeout != null) {
oldTimeout.getTask().cancel();
}
} }
private void scheduleTask(final Long startTime) { private void scheduleTask(final Long startTime) {
TimeoutTask oldTimeout = lastTimeout.get(); if (usage == 0) {
return;
}
if (startTime == null) { if (startTime == null) {
return; return;
} }
TimeoutTask oldTimeout = lastTimeout.get();
if (oldTimeout != null) { if (oldTimeout != null) {
oldTimeout.getTask().cancel(); oldTimeout.getTask().cancel();
} }
@ -137,6 +146,10 @@ public abstract class QueueTransferTask {
protected abstract RFuture<Long> pushTaskAsync(); protected abstract RFuture<Long> pushTaskAsync();
private void pushTask() { private void pushTask() {
if (usage == 0) {
return;
}
RFuture<Long> startTimeFuture = pushTaskAsync(); RFuture<Long> startTimeFuture = pushTaskAsync();
startTimeFuture.whenComplete((res, e) -> { startTimeFuture.whenComplete((res, e) -> {
if (e != null) { if (e != null) {

@ -1,6 +1,7 @@
package org.redisson; package org.redisson;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue; import org.redisson.api.RDelayedQueue;
import org.redisson.api.RQueue; import org.redisson.api.RQueue;
@ -12,11 +13,27 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonDelayedQueueTest extends RedisDockerTest { public class RedissonDelayedQueueTest extends RedisDockerTest {
@Test
public void testDestroy() throws InterruptedException {
RBlockingDeque<Integer> blockingDeque = redisson.getBlockingDeque("test");
RDelayedQueue<Integer> delayedQueue = redisson.getDelayedQueue(blockingDeque);
delayedQueue.offer(1, 2, TimeUnit.SECONDS);
delayedQueue.destroy();
RDelayedQueue<Integer> delayedQueue2 = redisson.getDelayedQueue(blockingDeque);
delayedQueue2.destroy();
Thread.sleep(1000);
Object s = blockingDeque.poll(3, TimeUnit.SECONDS);
assertThat(s).isNull();
}
@Test @Test
public void testRemove() throws InterruptedException { public void testRemove() throws InterruptedException {
RBlockingQueue<String> blockingFairQueue = redisson.getBlockingQueue("delay_queue"); RBlockingQueue<String> blockingFairQueue = redisson.getBlockingQueue("delay_queue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue);
delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS); delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS);
delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS); delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS);
assertThat(delayedQueue.contains("1_1_1")).isTrue(); assertThat(delayedQueue.contains("1_1_1")).isTrue();
@ -24,15 +41,15 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
assertThat(delayedQueue.contains("1_1_1")).isFalse(); assertThat(delayedQueue.contains("1_1_1")).isFalse();
Thread.sleep(9000); Thread.sleep(9000);
assertThat(blockingFairQueue).containsOnly("1_1_2"); assertThat(blockingFairQueue).containsOnly("1_1_2");
} }
@Test @Test
public void testRemoveAll() throws InterruptedException { public void testRemoveAll() throws InterruptedException {
RBlockingQueue<String> blockingFairQueue = redisson.getBlockingQueue("delay_queue"); RBlockingQueue<String> blockingFairQueue = redisson.getBlockingQueue("delay_queue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue);
delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS); delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS);
delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS); delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS);
assertThat(delayedQueue.contains("1_1_1")).isTrue(); assertThat(delayedQueue.contains("1_1_1")).isTrue();
@ -40,13 +57,13 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
assertThat(delayedQueue.removeAll(Arrays.asList("1_1_1", "1_1_2"))).isTrue(); assertThat(delayedQueue.removeAll(Arrays.asList("1_1_1", "1_1_2"))).isTrue();
assertThat(delayedQueue.contains("1_1_1")).isFalse(); assertThat(delayedQueue.contains("1_1_1")).isFalse();
assertThat(delayedQueue.contains("1_1_2")).isFalse(); assertThat(delayedQueue.contains("1_1_2")).isFalse();
Thread.sleep(9000); Thread.sleep(9000);
assertThat(blockingFairQueue.isEmpty()).isTrue(); assertThat(blockingFairQueue.isEmpty()).isTrue();
} }
@Test @Test
public void testDealyedQueueRetainAll() { public void testDealyedQueueRetainAll() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
@ -58,13 +75,13 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
assertThat(dealyedQueue.retainAll(Arrays.asList(1, 2, 3))).isFalse(); assertThat(dealyedQueue.retainAll(Arrays.asList(1, 2, 3))).isFalse();
assertThat(dealyedQueue.retainAll(Arrays.asList(3, 1, 2, 8))).isFalse(); assertThat(dealyedQueue.retainAll(Arrays.asList(3, 1, 2, 8))).isFalse();
assertThat(dealyedQueue.readAll()).containsExactly(3, 1, 2); assertThat(dealyedQueue.readAll()).containsExactly(3, 1, 2);
assertThat(dealyedQueue.retainAll(Arrays.asList(1, 2))).isTrue(); assertThat(dealyedQueue.retainAll(Arrays.asList(1, 2))).isTrue();
assertThat(dealyedQueue.readAll()).containsExactly(1, 2); assertThat(dealyedQueue.readAll()).containsExactly(1, 2);
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueueReadAll() { public void testDealyedQueueReadAll() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
@ -74,10 +91,10 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
dealyedQueue.offer(2, 1, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS);
assertThat(dealyedQueue.readAll()).containsExactly(3, 1, 2); assertThat(dealyedQueue.readAll()).containsExactly(3, 1, 2);
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueueRemoveAll() { public void testDealyedQueueRemoveAll() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
@ -85,24 +102,24 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(3, 5, TimeUnit.SECONDS);
dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS);
dealyedQueue.offer(2, 1, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS);
assertThat(dealyedQueue.removeAll(Arrays.asList(1, 2))).isTrue(); assertThat(dealyedQueue.removeAll(Arrays.asList(1, 2))).isTrue();
assertThat(dealyedQueue).containsExactly(3); assertThat(dealyedQueue).containsExactly(3);
assertThat(dealyedQueue.removeAll(Arrays.asList(3, 4))).isTrue(); assertThat(dealyedQueue.removeAll(Arrays.asList(3, 4))).isTrue();
assertThat(dealyedQueue).isEmpty(); assertThat(dealyedQueue).isEmpty();
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueueContainsAll() { public void testDealyedQueueContainsAll() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1);
dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(3, 5, TimeUnit.SECONDS);
dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS);
dealyedQueue.offer(2, 1, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS);
assertThat(dealyedQueue.containsAll(Arrays.asList(1, 2))).isTrue(); assertThat(dealyedQueue.containsAll(Arrays.asList(1, 2))).isTrue();
assertThat(dealyedQueue.containsAll(Arrays.asList(1, 2, 4))).isFalse(); assertThat(dealyedQueue.containsAll(Arrays.asList(1, 2, 4))).isFalse();
@ -111,19 +128,19 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueueContains() { public void testDealyedQueueContains() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1);
dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(3, 5, TimeUnit.SECONDS);
dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS);
dealyedQueue.offer(2, 1, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS);
assertThat(dealyedQueue.contains(1)).isTrue(); assertThat(dealyedQueue.contains(1)).isTrue();
assertThat(dealyedQueue.contains(4)).isFalse(); assertThat(dealyedQueue.contains(4)).isFalse();
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@ -131,37 +148,37 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
public void testDealyedQueueRemove() { public void testDealyedQueueRemove() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1);
dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(3, 5, TimeUnit.SECONDS);
dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS);
dealyedQueue.offer(2, 1, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS);
assertThat(dealyedQueue.remove(4)).isFalse(); assertThat(dealyedQueue.remove(4)).isFalse();
assertThat(dealyedQueue.remove(3)).isTrue(); assertThat(dealyedQueue.remove(3)).isTrue();
assertThat(dealyedQueue).containsExactly(1, 2); assertThat(dealyedQueue).containsExactly(1, 2);
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueuePeek() { public void testDealyedQueuePeek() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1);
dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(3, 5, TimeUnit.SECONDS);
dealyedQueue.offer(1, 2, TimeUnit.SECONDS); dealyedQueue.offer(1, 2, TimeUnit.SECONDS);
dealyedQueue.offer(2, 1, TimeUnit.SECONDS); dealyedQueue.offer(2, 1, TimeUnit.SECONDS);
assertThat(dealyedQueue.peek()).isEqualTo(3); assertThat(dealyedQueue.peek()).isEqualTo(3);
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueuePollLastAndOfferFirstTo() { public void testDealyedQueuePollLastAndOfferFirstTo() {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test");
RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1);
dealyedQueue.offer(3, 5, TimeUnit.SECONDS); dealyedQueue.offer(3, 5, TimeUnit.SECONDS);
dealyedQueue.offer(2, 2, TimeUnit.SECONDS); dealyedQueue.offer(2, 2, TimeUnit.SECONDS);
dealyedQueue.offer(1, 1, TimeUnit.SECONDS); dealyedQueue.offer(1, 1, TimeUnit.SECONDS);
@ -173,31 +190,31 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
assertThat(dealyedQueue.pollLastAndOfferFirstTo(queue2.getName())).isEqualTo(1); assertThat(dealyedQueue.pollLastAndOfferFirstTo(queue2.getName())).isEqualTo(1);
assertThat(queue2).containsExactly(1, 6, 5, 4); assertThat(queue2).containsExactly(1, 6, 5, 4);
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDelayedQueueOrder() { public void testDelayedQueueOrder() {
RBlockingQueue<String> queue = redisson.getBlockingQueue("test"); RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue); RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue);
dealyedQueue.offer("1", 1, TimeUnit.SECONDS); dealyedQueue.offer("1", 1, TimeUnit.SECONDS);
dealyedQueue.offer("4", 4, TimeUnit.SECONDS); dealyedQueue.offer("4", 4, TimeUnit.SECONDS);
dealyedQueue.offer("3", 3, TimeUnit.SECONDS); dealyedQueue.offer("3", 3, TimeUnit.SECONDS);
dealyedQueue.offer("2", 2, TimeUnit.SECONDS); dealyedQueue.offer("2", 2, TimeUnit.SECONDS);
assertThat(dealyedQueue).containsExactly("1", "4", "3", "2"); assertThat(dealyedQueue).containsExactly("1", "4", "3", "2");
assertThat(dealyedQueue.poll()).isEqualTo("1"); assertThat(dealyedQueue.poll()).isEqualTo("1");
assertThat(dealyedQueue.poll()).isEqualTo("4"); assertThat(dealyedQueue.poll()).isEqualTo("4");
assertThat(dealyedQueue.poll()).isEqualTo("3"); assertThat(dealyedQueue.poll()).isEqualTo("3");
assertThat(dealyedQueue.poll()).isEqualTo("2"); assertThat(dealyedQueue.poll()).isEqualTo("2");
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
assertThat(queue.poll()).isNull(); assertThat(queue.poll()).isNull();
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@ -224,57 +241,57 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testPoll() throws InterruptedException { public void testPoll() throws InterruptedException {
RBlockingQueue<String> queue = redisson.getBlockingQueue("test"); RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue); RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue);
dealyedQueue.offer("1", 1, TimeUnit.SECONDS); dealyedQueue.offer("1", 1, TimeUnit.SECONDS);
dealyedQueue.offer("2", 2, TimeUnit.SECONDS); dealyedQueue.offer("2", 2, TimeUnit.SECONDS);
dealyedQueue.offer("3", 3, TimeUnit.SECONDS); dealyedQueue.offer("3", 3, TimeUnit.SECONDS);
dealyedQueue.offer("4", 4, TimeUnit.SECONDS); dealyedQueue.offer("4", 4, TimeUnit.SECONDS);
assertThat(dealyedQueue.poll()).isEqualTo("1"); assertThat(dealyedQueue.poll()).isEqualTo("1");
assertThat(dealyedQueue.poll()).isEqualTo("2"); assertThat(dealyedQueue.poll()).isEqualTo("2");
assertThat(dealyedQueue.poll()).isEqualTo("3"); assertThat(dealyedQueue.poll()).isEqualTo("3");
assertThat(dealyedQueue.poll()).isEqualTo("4"); assertThat(dealyedQueue.poll()).isEqualTo("4");
Thread.sleep(3000); Thread.sleep(3000);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
assertThat(queue.poll()).isNull(); assertThat(queue.poll()).isNull();
assertThat(queue.poll()).isNull(); assertThat(queue.poll()).isNull();
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueue() throws InterruptedException { public void testDealyedQueue() throws InterruptedException {
RBlockingQueue<String> queue = redisson.getBlockingQueue("test"); RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue); RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue);
dealyedQueue.offer("1", 1, TimeUnit.SECONDS); dealyedQueue.offer("1", 1, TimeUnit.SECONDS);
dealyedQueue.offer("2", 5, TimeUnit.SECONDS); dealyedQueue.offer("2", 5, TimeUnit.SECONDS);
dealyedQueue.offer("4", 4, TimeUnit.SECONDS); dealyedQueue.offer("4", 4, TimeUnit.SECONDS);
dealyedQueue.offer("2", 2, TimeUnit.SECONDS); dealyedQueue.offer("2", 2, TimeUnit.SECONDS);
dealyedQueue.offer("3", 3, TimeUnit.SECONDS); dealyedQueue.offer("3", 3, TimeUnit.SECONDS);
assertThat(dealyedQueue).containsExactly("1", "2", "4", "2", "3"); assertThat(dealyedQueue).containsExactly("1", "2", "4", "2", "3");
Thread.sleep(500); Thread.sleep(500);
assertThat(queue.isEmpty()).isTrue(); assertThat(queue.isEmpty()).isTrue();
Thread.sleep(600); Thread.sleep(600);
assertThat(queue).containsExactly("1"); assertThat(queue).containsExactly("1");
assertThat(dealyedQueue).containsExactly("2", "4", "2", "3"); assertThat(dealyedQueue).containsExactly("2", "4", "2", "3");
Thread.sleep(500); Thread.sleep(500);
assertThat(queue).containsExactly("1"); assertThat(queue).containsExactly("1");
Thread.sleep(500); Thread.sleep(500);
assertThat(queue).containsExactly("1", "2"); assertThat(queue).containsExactly("1", "2");
assertThat(dealyedQueue).containsExactly("2", "4", "3"); assertThat(dealyedQueue).containsExactly("2", "4", "3");
Thread.sleep(500); Thread.sleep(500);
assertThat(queue).containsExactly("1", "2"); assertThat(queue).containsExactly("1", "2");
@ -287,7 +304,7 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
Thread.sleep(500); Thread.sleep(500);
assertThat(queue).containsExactly("1", "2", "3", "4"); assertThat(queue).containsExactly("1", "2", "3", "4");
assertThat(dealyedQueue).containsExactly("2"); assertThat(dealyedQueue).containsExactly("2");
Thread.sleep(500); Thread.sleep(500);
assertThat(queue).containsExactly("1", "2", "3", "4"); assertThat(queue).containsExactly("1", "2", "3", "4");
@ -295,16 +312,16 @@ public class RedissonDelayedQueueTest extends RedisDockerTest {
assertThat(queue).containsExactly("1", "2", "3", "4", "2"); assertThat(queue).containsExactly("1", "2", "3", "4", "2");
assertThat(dealyedQueue).isEmpty(); assertThat(dealyedQueue).isEmpty();
assertThat(queue.poll()).isEqualTo("1"); assertThat(queue.poll()).isEqualTo("1");
assertThat(queue.poll()).isEqualTo("2"); assertThat(queue.poll()).isEqualTo("2");
assertThat(queue.poll()).isEqualTo("3"); assertThat(queue.poll()).isEqualTo("3");
assertThat(queue.poll()).isEqualTo("4"); assertThat(queue.poll()).isEqualTo("4");
assertThat(queue.poll()).isEqualTo("2"); assertThat(queue.poll()).isEqualTo("2");
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
} }

Loading…
Cancel
Save