Fixed - DelayedQueue.remove(),DelayedQueue.removeAll() don't work #792

pull/812/head
Nikita 8 years ago
parent 83e7ef26db
commit 8b49d5bec4

@ -30,6 +30,7 @@ import org.redisson.api.RTopic;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
@ -286,18 +287,19 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
} }
protected RFuture<Boolean> removeAsync(Object o, int count) { protected RFuture<Boolean> removeAsync(Object o, int count) {
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4), return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
"local s = redis.call('llen', KEYS[1]);" + "local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s-1, 1 do " "for i = 0, s-1, 1 do "
+ "local v = redis.call('lindex', KEYS[1], i);" + "local v = redis.call('lindex', KEYS[1], i);"
+ "local randomId, value = struct.unpack('dLc0', v);" + "local randomId, value = struct.unpack('dLc0', v);"
+ "if ARGV[1] == value then " + "if ARGV[1] == value then "
+ "redis.call('zrem', KEYS[2], v);"
+ "redis.call('lrem', KEYS[1], 1, v);" + "redis.call('lrem', KEYS[1], 1, v);"
+ "return 1;" + "return 1;"
+ "end; " + "end; "
+ "end;" + + "end;" +
"return 0;", "return 0;",
Collections.<Object>singletonList(getQueueName()), o); Arrays.<Object>asList(getQueueName(), getTimeoutSetName()), o);
} }
@Override @Override
@ -338,7 +340,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return newSucceededFuture(false); return newSucceededFuture(false);
} }
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.OBJECTS),
"local result = 0;" + "local result = 0;" +
"local s = redis.call('llen', KEYS[1]);" + "local s = redis.call('llen', KEYS[1]);" +
"local i = 0;" + "local i = 0;" +
@ -351,6 +353,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "result = 1; " + "result = 1; "
+ "i = i - 1; " + "i = i - 1; "
+ "s = s - 1; " + "s = s - 1; "
+ "redis.call('zrem', KEYS[2], v);"
+ "redis.call('lrem', KEYS[1], 0, v); " + "redis.call('lrem', KEYS[1], 0, v); "
+ "break; " + "break; "
+ "end; " + "end; "
@ -358,7 +361,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1;" + "i = i + 1;"
+ "end; " + "end; "
+ "return result;", + "return result;",
Collections.<Object>singletonList(getQueueName()), c.toArray()); Arrays.<Object>asList(getQueueName(), getTimeoutSetName()), c.toArray());
} }
@Override @Override

@ -1,6 +1,7 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -11,6 +12,41 @@ import org.redisson.api.RQueue;
public class RedissonDelayedQueueTest extends BaseTest { public class RedissonDelayedQueueTest extends BaseTest {
@Test
public void testRemove() throws InterruptedException {
RBlockingFairQueue<String> blockingFairQueue = redisson.getBlockingFairQueue("delay_queue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue);
delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS);
delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS);
assertThat(delayedQueue.contains("1_1_1")).isTrue();
assertThat(delayedQueue.remove("1_1_1")).isTrue();
assertThat(delayedQueue.contains("1_1_1")).isFalse();
Thread.sleep(9000);
assertThat(blockingFairQueue.isEmpty()).isTrue();
}
@Test
public void testRemoveAll() throws InterruptedException {
RBlockingFairQueue<String> blockingFairQueue = redisson.getBlockingFairQueue("delay_queue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingFairQueue);
delayedQueue.offer("1_1_1", 3, TimeUnit.SECONDS);
delayedQueue.offer("1_1_2", 7, TimeUnit.SECONDS);
assertThat(delayedQueue.contains("1_1_1")).isTrue();
assertThat(delayedQueue.contains("1_1_2")).isTrue();
assertThat(delayedQueue.removeAll(Arrays.asList("1_1_1", "1_1_2"))).isTrue();
assertThat(delayedQueue.contains("1_1_1")).isFalse();
assertThat(delayedQueue.contains("1_1_2")).isFalse();
Thread.sleep(9000);
assertThat(blockingFairQueue.isEmpty()).isTrue();
}
@Test @Test
public void testDealyedQueueRetainAll() { public void testDealyedQueueRetainAll() {
RBlockingFairQueue<Integer> queue1 = redisson.getBlockingFairQueue("test"); RBlockingFairQueue<Integer> queue1 = redisson.getBlockingFairQueue("test");
@ -28,7 +64,6 @@ public class RedissonDelayedQueueTest extends BaseTest {
dealyedQueue.destroy(); dealyedQueue.destroy();
} }
@Test @Test
public void testDealyedQueueReadAll() { public void testDealyedQueueReadAll() {

Loading…
Cancel
Save