From 8b49d5bec4c152e6748f17b853f77c3a3fa0319d Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 15 Mar 2017 11:10:56 +0300 Subject: [PATCH] =?UTF-8?q?Fixed=20-=20DelayedQueue.remove()=EF=BC=8CDelay?= =?UTF-8?q?edQueue.removeAll()=20don't=20work=20#792?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redisson/RedissonDelayedQueue.java | 11 ++++-- .../redisson/RedissonDelayedQueueTest.java | 37 ++++++++++++++++++- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index bd9caf60d..595cdc7f2 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -30,6 +30,7 @@ import org.redisson.api.RTopic; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; @@ -286,18 +287,19 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay } protected RFuture removeAsync(Object o, int count) { - return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), "local s = redis.call('llen', KEYS[1]);" + "for i = 0, s-1, 1 do " + "local v = redis.call('lindex', KEYS[1], i);" + "local randomId, value = struct.unpack('dLc0', v);" + "if ARGV[1] == value then " + + "redis.call('zrem', KEYS[2], v);" + "redis.call('lrem', KEYS[1], 1, v);" + "return 1;" + "end; " + "end;" + "return 0;", - Collections.singletonList(getQueueName()), o); + Arrays.asList(getQueueName(), getTimeoutSetName()), o); } @Override @@ -338,7 +340,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay return newSucceededFuture(false); } - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 5, ValueType.OBJECTS), "local result = 0;" + "local s = redis.call('llen', KEYS[1]);" + "local i = 0;" + @@ -351,6 +353,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "result = 1; " + "i = i - 1; " + "s = s - 1; " + + "redis.call('zrem', KEYS[2], v);" + "redis.call('lrem', KEYS[1], 0, v); " + "break; " + "end; " @@ -358,7 +361,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "i = i + 1;" + "end; " + "return result;", - Collections.singletonList(getQueueName()), c.toArray()); + Arrays.asList(getQueueName(), getTimeoutSetName()), c.toArray()); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java b/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java index 8ab730438..cd917a883 100644 --- a/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java @@ -1,6 +1,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; + import java.util.Arrays; import java.util.concurrent.TimeUnit; @@ -11,6 +12,41 @@ import org.redisson.api.RQueue; public class RedissonDelayedQueueTest extends BaseTest { + @Test + public void testRemove() throws InterruptedException { + RBlockingFairQueue blockingFairQueue = redisson.getBlockingFairQueue("delay_queue"); + RDelayedQueue delayedQueue = redisson.getDelayedQueue(blockingFairQueue); + + delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS); + delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS); + assertThat(delayedQueue.contains("1_1_1")).isTrue(); + assertThat(delayedQueue.remove("1_1_1")).isTrue(); + assertThat(delayedQueue.contains("1_1_1")).isFalse(); + + Thread.sleep(9000); + + assertThat(blockingFairQueue.isEmpty()).isTrue(); + } + + @Test + public void testRemoveAll() throws InterruptedException { + RBlockingFairQueue blockingFairQueue = redisson.getBlockingFairQueue("delay_queue"); + RDelayedQueue delayedQueue = redisson.getDelayedQueue(blockingFairQueue); + + delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS); + delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS); + assertThat(delayedQueue.contains("1_1_1")).isTrue(); + assertThat(delayedQueue.contains("1_1_2")).isTrue(); + assertThat(delayedQueue.removeAll(Arrays.asList("1_1_1", "1_1_2"))).isTrue(); + assertThat(delayedQueue.contains("1_1_1")).isFalse(); + assertThat(delayedQueue.contains("1_1_2")).isFalse(); + + Thread.sleep(9000); + + assertThat(blockingFairQueue.isEmpty()).isTrue(); + } + + @Test public void testDealyedQueueRetainAll() { RBlockingFairQueue queue1 = redisson.getBlockingFairQueue("test"); @@ -28,7 +64,6 @@ public class RedissonDelayedQueueTest extends BaseTest { dealyedQueue.destroy(); } - @Test public void testDealyedQueueReadAll() {