Fixed - NameMapper applied incorrectly to RBoundedBlockingQueue object. #4521

pull/4551/head
Nikita Koksharov 2 years ago
parent b171936565
commit ce16afe0e8

@ -28,6 +28,7 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingQueue}.
@ -126,11 +127,12 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
List<String> mappedNames = Arrays.stream(queueNames).map(m -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList());
List<Object> params = new ArrayList<>();
params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS));
params.add(queueNames.length + 1);
params.add(getRawName());
params.addAll(Arrays.asList(queueNames));
params.addAll(mappedNames);
params.add("LEFT");
params.add("COUNT");
params.add(count);
@ -144,11 +146,12 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
List<String> mappedNames = Arrays.stream(queueNames).map(m -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList());
List<Object> params = new ArrayList<>();
params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS));
params.add(queueNames.length + 1);
params.add(getRawName());
params.addAll(Arrays.asList(queueNames));
params.addAll(mappedNames);
params.add("RIGHT");
params.add("COUNT");
params.add(count);
@ -157,7 +160,8 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), queueName, toSeconds(timeout, unit));
String mappedName = commandExecutor.getConnectionManager().getConfig().getNameMapper().map(queueName);
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), mappedName, toSeconds(timeout, unit));
}
@Override

@ -42,19 +42,25 @@ import java.util.function.Consumer;
public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements RBoundedBlockingQueue<V> {
private final RedissonBlockingQueue<V> blockingQueue;
private final RedissonQueueSemaphore semaphore;
private final String channelName;
protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<V>(commandExecutor, name, redisson);
semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName());
channelName = RedissonSemaphore.getChannelName(semaphore.getRawName());
}
protected RedissonBoundedBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<V>(commandExecutor, name, redisson);
semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName());
channelName = RedissonSemaphore.getChannelName(semaphore.getRawName());
}
private String getSemaphoreName() {
return prefixName("redisson_bqs", getRawName());
return prefixName("redisson_bqs", getName());
}
@Override
@ -142,7 +148,6 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return new CompletableFutureWrapper<>(false);
}
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"local count = 0; " +
"for i = 1, #ARGV, 1 do "
@ -156,12 +161,11 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
+ "return 1;"
+ "end;"
+ "return 0 ",
Arrays.<Object>asList(getRawName(), getSemaphoreName(), channelName), encode(c).toArray());
Arrays.asList(getRawName(), semaphore.getRawName(), channelName), encode(c).toArray());
}
@Override
public RFuture<V> pollAsync() {
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local res = redis.call('lpop', KEYS[1]);"
+ "if res ~= false then " +
@ -169,7 +173,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
"redis.call('publish', KEYS[3], value); "
+ "end;"
+ "return res;",
Arrays.<Object>asList(getRawName(), getSemaphoreName(), channelName), 1);
Arrays.asList(getRawName(), semaphore.getRawName(), channelName), 1);
}
/*
@ -283,8 +287,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
if (c == null) {
throw new NullPointerException();
}
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('del', KEYS[1]); " +
@ -293,7 +296,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
"redis.call('publish', KEYS[3], value); "
+ "end; " +
"return vals",
Arrays.<Object>asList(getRawName(), getSemaphoreName(), channelName));
Arrays.asList(getRawName(), semaphore.getRawName(), channelName));
}
@Override
@ -310,9 +313,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
if (c == null) {
throw new NullPointerException();
}
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
@ -322,12 +323,11 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
"redis.call('publish', KEYS[3], value); "
+ "end; " +
"return vals",
Arrays.<Object>asList(getRawName(), getSemaphoreName(), channelName), maxElements);
Arrays.asList(getRawName(), semaphore.getRawName(), channelName), maxElements);
}
@Override
public RFuture<Boolean> trySetCapacityAsync(int capacity) {
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
@ -336,7 +336,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getSemaphoreName(), channelName), capacity);
Arrays.asList(semaphore.getRawName(), channelName), capacity);
}
@Override
@ -346,7 +346,6 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public void clear() {
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"local len = redis.call('llen', KEYS[1]); " +
"if len > 0 then "
@ -354,7 +353,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
+ "local value = redis.call('incrby', KEYS[2], len); " +
"redis.call('publish', KEYS[3], value); "
+ "end; ",
Arrays.<Object>asList(getRawName(), getSemaphoreName(), channelName)));
Arrays.asList(getRawName(), semaphore.getRawName(), channelName)));
}

@ -659,19 +659,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<?> command, long secondsTimeout, String... queueNames) {
List<String> mappedNames = Arrays.stream(queueNames).map(m -> connectionManager.getConfig().getNameMapper().map(m)).collect(Collectors.toList());
if (connectionManager.isClusterMode() && queueNames.length > 0) {
AtomicReference<Iterator<String>> ref = new AtomicReference<>();
List<String> names = new ArrayList<>();
names.add(name);
names.addAll(Arrays.asList(queueNames));
names.addAll(mappedNames);
ref.set(names.iterator());
AtomicLong counter = new AtomicLong(secondsTimeout);
CompletionStage<V> result = poll(codec, ref, names, counter, command);
return new CompletableFutureWrapper<>(result);
} else {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
List<Object> params = new ArrayList<>(queueNames.length + 1);
params.add(name);
params.addAll(Arrays.asList(queueNames));
params.addAll(mappedNames);
params.add(secondsTimeout);
return writeAsync(name, codec, command, params.toArray());
}

Loading…
Cancel
Save