From 45ec4b4f3bac3dcaa57ae2f0075cd12aa0df81e1 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Apr 2023 14:57:03 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedissonBaseLock.java | 2 +- .../main/java/org/redisson/RedissonFairLock.java | 6 +++--- .../java/org/redisson/RedissonFencedLock.java | 4 ++-- .../src/main/java/org/redisson/RedissonLock.java | 4 ++-- .../RedissonPermitExpirableSemaphore.java | 16 ++++++---------- .../main/java/org/redisson/RedissonReadLock.java | 6 +++--- .../java/org/redisson/RedissonSemaphore.java | 6 +++--- .../main/java/org/redisson/RedissonSpinLock.java | 4 ++-- .../java/org/redisson/RedissonWriteLock.java | 4 ++-- .../redisson/command/CommandAsyncExecutor.java | 4 +++- .../redisson/command/CommandAsyncService.java | 7 ++++++- 11 files changed, 33 insertions(+), 30 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index a680c2cb8..172e72dbf 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -203,7 +203,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc } protected final RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - return commandExecutor.syncedEvalWriteAsync(key, codec, evalCommandType, script, keys, params); + return commandExecutor.syncedEval(key, codec, evalCommandType, script, keys, params); } protected void acquireFailed(long waitTime, TimeUnit unit, long threadId) { diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index 330dae45e..3b6fa0ac2 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -109,7 +109,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { long currentTime = System.currentTimeMillis(); if (command == RedisCommands.EVAL_NULL_BOOLEAN) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, // remove stale threads "while true do " + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + @@ -154,7 +154,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { } if (command == RedisCommands.EVAL_LONG) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, // remove stale threads "while true do " + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + @@ -312,7 +312,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads "while true do " + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" diff --git a/redisson/src/main/java/org/redisson/RedissonFencedLock.java b/redisson/src/main/java/org/redisson/RedissonFencedLock.java index 519fdb32a..243198bd6 100644 --- a/redisson/src/main/java/org/redisson/RedissonFencedLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFencedLock.java @@ -105,7 +105,7 @@ public class RedissonFencedLock extends RedissonLock implements RFencedLock { } RFuture> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG_LIST, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG_LIST, "if (redis.call('exists', KEYS[1]) == 0 " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "local token = redis.call('incr', KEYS[2]);" + @@ -298,7 +298,7 @@ public class RedissonFencedLock extends RedissonLock implements RFencedLock { @Override RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('incr', KEYS[2]);" + diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index bfe69f965..2c204da5c 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -203,7 +203,7 @@ public class RedissonLock extends RedissonBaseLock { } RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + @@ -315,7 +315,7 @@ public class RedissonLock extends RedissonBaseLock { @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 387a57a2a..a689b0760 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -357,7 +357,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } private RFuture tryAcquireAsync(byte[] id, int permits, long timeoutDate) { - return commandExecutor.syncedEvalWriteAsync(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_PERMIT_DATA, + return commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_PERMIT_DATA, "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " + "if #expiredIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + @@ -567,7 +567,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen } byte[] id = ByteBufUtil.decodeHexDump(permitId); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local expire = redis.call('zscore', KEYS[3], ARGV[1]);" + "local removed = redis.call('zrem', KEYS[3], ARGV[1]);" + "if tonumber(removed) ~= 1 then " + @@ -707,7 +707,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen @Override public RFuture setPermitsAsync(int permits) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local available = redis.call('get', KEYS[1]); " + "if (available == false) then " + "redis.call('set', KEYS[1], ARGV[1]); " + @@ -726,7 +726,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen @Override public RFuture trySetPermitsAsync(int permits) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "redis.call('set', KEYS[1], ARGV[1]); " @@ -744,7 +744,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen @Override public RFuture addPermitsAsync(int permits) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "value = 0;" @@ -760,7 +760,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen public RFuture updateLeaseTimeAsync(String permitId, long leaseTime, TimeUnit unit) { long timeoutDate = calcTimeout(leaseTime, unit); byte[] id = ByteBufUtil.decodeHexDump(permitId); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, -1); " + "if #expiredIds > 0 then " + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + @@ -785,8 +785,4 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen return get(updateLeaseTimeAsync(permitId, leaseTime, unit)); } - private RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - return commandExecutor.getServiceManager().execute(() -> commandExecutor.syncedEvalWriteAsync(key, codec, evalCommandType, script, keys, params)); - } - } diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index 5e3f7911d..f34a1f50b 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -56,7 +56,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { @Override RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + @@ -85,7 +85,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + @@ -173,7 +173,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 25dfc3388..976d5d764 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -450,7 +450,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public RFuture drainPermitsAsync() { - return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "return 0; " + @@ -477,7 +477,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public RFuture trySetPermitsAsync(int permits) { - RFuture future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + RFuture future = commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "redis.call('set', KEYS[1], ARGV[1]); " @@ -506,7 +506,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public RFuture addPermitsAsync(int permits) { - return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "value = 0;" diff --git a/redisson/src/main/java/org/redisson/RedissonSpinLock.java b/redisson/src/main/java/org/redisson/RedissonSpinLock.java index cea5bbd34..9e043fdbe 100644 --- a/redisson/src/main/java/org/redisson/RedissonSpinLock.java +++ b/redisson/src/main/java/org/redisson/RedissonSpinLock.java @@ -128,7 +128,7 @@ public class RedissonSpinLock extends RedissonBaseLock { RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + @@ -190,7 +190,7 @@ public class RedissonSpinLock extends RedissonBaseLock { @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "return 1 " + "else " diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index 3289f07db..c02d2d25b 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -54,7 +54,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { @Override RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, + return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + @@ -130,7 +130,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 7ec64bc47..21fea73a6 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -134,7 +134,9 @@ public interface CommandAsyncExecutor { void setEvalShaROSupported(boolean value); - RFuture syncedEvalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); + RFuture syncedEvalWithRetry(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); + + RFuture syncedEval(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); CompletionStage handleNoSync(CompletionStage stage, Supplier> supplier); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 50b6cbc24..043ed5aff 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -734,7 +734,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture syncedEvalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { + public RFuture syncedEvalWithRetry(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { + return getServiceManager().execute(() -> syncedEval(key, codec, evalCommandType, script, keys, params)); + } + + @Override + public RFuture syncedEval(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { CompletionStage> replicationFuture = CompletableFuture.completedFuture(Collections.emptyMap()); if (!getServiceManager().getConfig().checkSkipSlavesInit()) { replicationFuture = writeAsync(key, RedisCommands.INFO_REPLICATION);