diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 7e2a293ef..8e3d60fe8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -28,6 +28,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; /** *

Distributed and concurrent implementation of {@link java.util.concurrent.BlockingQueue}. @@ -126,11 +127,12 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + List mappedNames = Arrays.stream(queueNames).map(m -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList()); List params = new ArrayList<>(); params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS)); params.add(queueNames.length + 1); params.add(getRawName()); - params.addAll(Arrays.asList(queueNames)); + params.addAll(mappedNames); params.add("LEFT"); params.add("COUNT"); params.add(count); @@ -144,11 +146,12 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + List mappedNames = Arrays.stream(queueNames).map(m -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList()); List params = new ArrayList<>(); params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS)); params.add(queueNames.length + 1); params.add(getRawName()); - params.addAll(Arrays.asList(queueNames)); + params.addAll(mappedNames); params.add("RIGHT"); params.add("COUNT"); params.add(count); @@ -157,7 +160,8 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { - return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), queueName, toSeconds(timeout, unit)); + String mappedName = commandExecutor.getConnectionManager().getConfig().getNameMapper().map(queueName); + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), mappedName, toSeconds(timeout, unit)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index f4ac808ab..9e6515794 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -42,19 +42,25 @@ import java.util.function.Consumer; public class RedissonBoundedBlockingQueue extends RedissonQueue implements RBoundedBlockingQueue { private final RedissonBlockingQueue blockingQueue; + private final RedissonQueueSemaphore semaphore; + private final String channelName; protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name, redisson); blockingQueue = new RedissonBlockingQueue(commandExecutor, name, redisson); + semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName()); + channelName = RedissonSemaphore.getChannelName(semaphore.getRawName()); } protected RedissonBoundedBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(codec, commandExecutor, name, redisson); blockingQueue = new RedissonBlockingQueue(commandExecutor, name, redisson); + semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName()); + channelName = RedissonSemaphore.getChannelName(semaphore.getRawName()); } private String getSemaphoreName() { - return prefixName("redisson_bqs", getRawName()); + return prefixName("redisson_bqs", getName()); } @Override @@ -142,7 +148,6 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements return new CompletableFutureWrapper<>(false); } - String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, "local count = 0; " + "for i = 1, #ARGV, 1 do " @@ -156,12 +161,11 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements + "return 1;" + "end;" + "return 0 ", - Arrays.asList(getRawName(), getSemaphoreName(), channelName), encode(c).toArray()); + Arrays.asList(getRawName(), semaphore.getRawName(), channelName), encode(c).toArray()); } @Override public RFuture pollAsync() { - String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, "local res = redis.call('lpop', KEYS[1]);" + "if res ~= false then " + @@ -169,7 +173,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements "redis.call('publish', KEYS[3], value); " + "end;" + "return res;", - Arrays.asList(getRawName(), getSemaphoreName(), channelName), 1); + Arrays.asList(getRawName(), semaphore.getRawName(), channelName), 1); } /* @@ -283,8 +287,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements if (c == null) { throw new NullPointerException(); } - - String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); + return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local vals = redis.call('lrange', KEYS[1], 0, -1); " + "redis.call('del', KEYS[1]); " + @@ -293,7 +296,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements "redis.call('publish', KEYS[3], value); " + "end; " + "return vals", - Arrays.asList(getRawName(), getSemaphoreName(), channelName)); + Arrays.asList(getRawName(), semaphore.getRawName(), channelName)); } @Override @@ -310,9 +313,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements if (c == null) { throw new NullPointerException(); } - - String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); - + return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + @@ -322,12 +323,11 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements "redis.call('publish', KEYS[3], value); " + "end; " + "return vals", - Arrays.asList(getRawName(), getSemaphoreName(), channelName), maxElements); + Arrays.asList(getRawName(), semaphore.getRawName(), channelName), maxElements); } @Override public RFuture trySetCapacityAsync(int capacity) { - String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " @@ -336,7 +336,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements + "return 1;" + "end;" + "return 0;", - Arrays.asList(getSemaphoreName(), channelName), capacity); + Arrays.asList(semaphore.getRawName(), channelName), capacity); } @Override @@ -346,7 +346,6 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public void clear() { - String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, "local len = redis.call('llen', KEYS[1]); " + "if len > 0 then " @@ -354,7 +353,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements + "local value = redis.call('incrby', KEYS[2], len); " + "redis.call('publish', KEYS[3], value); " + "end; ", - Arrays.asList(getRawName(), getSemaphoreName(), channelName))); + Arrays.asList(getRawName(), semaphore.getRawName(), channelName))); } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 4861aeb59..57c06581d 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -659,19 +659,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames) { + List mappedNames = Arrays.stream(queueNames).map(m -> connectionManager.getConfig().getNameMapper().map(m)).collect(Collectors.toList()); if (connectionManager.isClusterMode() && queueNames.length > 0) { AtomicReference> ref = new AtomicReference<>(); List names = new ArrayList<>(); names.add(name); - names.addAll(Arrays.asList(queueNames)); + names.addAll(mappedNames); ref.set(names.iterator()); AtomicLong counter = new AtomicLong(secondsTimeout); CompletionStage result = poll(codec, ref, names, counter, command); return new CompletableFutureWrapper<>(result); } else { - List params = new ArrayList(queueNames.length + 1); + List params = new ArrayList<>(queueNames.length + 1); params.add(name); - params.addAll(Arrays.asList(queueNames)); + params.addAll(mappedNames); params.add(secondsTimeout); return writeAsync(name, codec, command, params.toArray()); }