From fcbb2fa08e3e62ad097a919dc8b6ce80684012df Mon Sep 17 00:00:00 2001 From: seakider Date: Tue, 3 Sep 2024 21:28:21 +0800 Subject: [PATCH] Fixed - The drainToAsync method returns an incorrect value Signed-off-by: seakider --- .../org/redisson/RedissonBoundedBlockingQueue.java | 9 ++++----- .../redisson/RedissonBoundedBlockingQueueTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 78d412b3b..7baec34f8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -324,10 +324,6 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public int drainTo(Collection c, int maxElements) { - if (maxElements <= 0) { - return 0; - } - return get(drainToAsync(c, maxElements)); } @@ -336,7 +332,10 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements if (c == null) { throw new NullPointerException(); } - + if (maxElements <= 0) { + return new CompletableFutureWrapper<>(0); + } + return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + diff --git a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java index 2c201355f..b3eb51e00 100644 --- a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java @@ -621,6 +621,20 @@ public class RedissonBoundedBlockingQueueTest extends RedisDockerTest { assertThat(queue.remainingCapacity()).isEqualTo(100); Assertions.assertEquals(0, queue.size()); } + + @Test + public void testDrainToAsync() throws ExecutionException, InterruptedException { + RBoundedBlockingQueue queue = redisson.getBoundedBlockingQueue("queue"); + queue.trySetCapacity(100); + for (int i = 0 ; i < 100; i++) { + assertThat(queue.offer(i)).isTrue(); + } + Assertions.assertEquals(100, queue.size()); + Set batch = new HashSet(); + RFuture future = queue.drainToAsync(batch, 0); + Assertions.assertEquals(future.get(), 0); + Assertions.assertEquals(batch.size(), 0); + } @Test public void testBlockingQueue() {