Fixed - RBlockingQueue.pollFromAny() method doesn't include self queue name #4156

pull/4162/head
Nikita Koksharov 3 years ago
parent bf4468b6fa
commit 1b70f17e48

@ -24,8 +24,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import java.util.Collection;
import java.util.Collections;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -110,7 +109,10 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames);
List<String> names = new ArrayList<>(Arrays.asList(queueNames));
names.add(0, getRawName());
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE,
toSeconds(timeout, unit), names.toArray(new String[]{}));
}
@Override

@ -470,7 +470,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
@ -483,12 +483,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Awaitility.await().between(Duration.ofSeconds(2), Duration.ofSeconds(4)).untilAsserted(() -> {
int value = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
assertThat(value).isEqualTo(1);
});
Assertions.assertEquals(2, l);
Assertions.assertTrue(System.currentTimeMillis() - s > 2000);
redisson.shutdown();
process.shutdown();
}

Loading…
Cancel
Save