|
|
@ -53,10 +53,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private String getSemaphoreName() {
|
|
|
|
private String getSemaphoreName() {
|
|
|
|
if (getName().contains("{")) {
|
|
|
|
return prefixName("redisson_bqs", getName());
|
|
|
|
return "redisson_bqs:" + getName();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return "redisson_bqs:{" + getName() + "}";
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -310,6 +307,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
|
|
|
|
Arrays.<Object>asList(getName(), getSemaphoreName(), channelName), maxElements);
|
|
|
|
Arrays.<Object>asList(getName(), getSemaphoreName(), channelName), maxElements);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public RFuture<Boolean> trySetCapacityAsync(int capacity) {
|
|
|
|
public RFuture<Boolean> trySetCapacityAsync(int capacity) {
|
|
|
|
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
|
|
|
|
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
|
|
@ -323,6 +321,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
|
|
|
|
Arrays.<Object>asList(getSemaphoreName(), channelName), capacity);
|
|
|
|
Arrays.<Object>asList(getSemaphoreName(), channelName), capacity);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean trySetCapacity(int capacity) {
|
|
|
|
public boolean trySetCapacity(int capacity) {
|
|
|
|
return get(trySetCapacityAsync(capacity));
|
|
|
|
return get(trySetCapacityAsync(capacity));
|
|
|
|
}
|
|
|
|
}
|
|
|
|