Fixed - don't retry non-idempotent operations which were successfully sent. #3850

pull/3917/head
Nikita Koksharov 3 years ago
parent 134a34a7a1
commit f09de802e6

@ -177,7 +177,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<V> pollAsync() {
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local res = redis.call('lpop', KEYS[1]);"
+ "if res ~= false then " +
"local value = redis.call('incrby', KEYS[2], ARGV[1]); " +

@ -275,7 +275,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<List<V>> pollAsync(int limit) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local v = redis.call('lpop', KEYS[1]);" +
@ -463,7 +463,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<V> pollAsync() {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('lpop', KEYS[1]); "
+ "if v ~= false then "
+ "redis.call('zrem', KEYS[2], v); "
@ -481,7 +481,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('rpop', KEYS[1]); "
+ "if v ~= false then "
+ "redis.call('zrem', KEYS[2], v); "

@ -256,7 +256,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override
public RFuture<List<V>> pollLastAsync(int limit) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('rpop', KEYS[1]);" +

@ -221,7 +221,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override
public RFuture<List<V>> pollAsync(int limit) {
return wrapLockedAsync(() -> {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('lpop', KEYS[1]);" +

@ -304,7 +304,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
@Override
public RFuture<List<V>> pollLastAsync(int limit) {
return wrapLockedAsync(() -> {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('rpop', KEYS[1]);" +

@ -456,7 +456,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
@Override
public RFuture<List<V>> pollAsync(int limit) {
return wrapLockedAsync(() -> {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('lpop', KEYS[1]);" +

@ -86,7 +86,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
@Override
public RFuture<List<V>> pollAsync(int limit) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('lpop', KEYS[1]);" +

@ -79,7 +79,7 @@ public class RedissonRingBuffer<V> extends RedissonQueue<V> implements RRingBuff
@Override
public RFuture<Boolean> addAsync(V e) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local limit = redis.call('get', KEYS[2]); "
+ "assert(limit ~= false, 'RingBuffer capacity is not defined'); "
+ "local size = redis.call('rpush', KEYS[1], ARGV[1]); "

@ -522,7 +522,7 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
@Override
public V remove(int index) {
if (index == 0) {
RFuture<V> future = commandExecutor.evalWriteAsync(queueName, codec, EVAL_REQUEST,
RFuture<V> future = commandExecutor.evalWriteNoRetryAsync(queueName, codec, EVAL_REQUEST,
"local id = redis.call('lpop', KEYS[1]); "
+ "if id ~= false then "
+ "return redis.call('hget', KEYS[2], id); "

Loading…
Cancel
Save