refactoring

pull/4979/head
Nikita Koksharov 2 years ago
parent a8072b8ec2
commit 45ec4b4f3b

@ -203,7 +203,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
}
protected final <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return commandExecutor.syncedEvalWriteAsync(key, codec, evalCommandType, script, keys, params);
return commandExecutor.syncedEval(key, codec, evalCommandType, script, keys, params);
}
protected void acquireFailed(long waitTime, TimeUnit unit, long threadId) {

@ -109,7 +109,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
@ -154,7 +154,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
}
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
@ -312,7 +312,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"

@ -105,7 +105,7 @@ public class RedissonFencedLock extends RedissonLock implements RFencedLock {
}
RFuture<List<Long>> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG_LIST,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG_LIST,
"if (redis.call('exists', KEYS[1]) == 0 " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"local token = redis.call('incr', KEYS[2]);" +
@ -298,7 +298,7 @@ public class RedissonFencedLock extends RedissonLock implements RFencedLock {
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('incr', KEYS[2]);" +

@ -203,7 +203,7 @@ public class RedissonLock extends RedissonBaseLock {
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
@ -315,7 +315,7 @@ public class RedissonLock extends RedissonBaseLock {
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "

@ -357,7 +357,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
private RFuture<String> tryAcquireAsync(byte[] id, int permits, long timeoutDate) {
return commandExecutor.syncedEvalWriteAsync(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_PERMIT_DATA,
return commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_PERMIT_DATA,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
@ -567,7 +567,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
byte[] id = ByteBufUtil.decodeHexDump(permitId);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expire = redis.call('zscore', KEYS[3], ARGV[1]);" +
"local removed = redis.call('zrem', KEYS[3], ARGV[1]);" +
"if tonumber(removed) ~= 1 then " +
@ -707,7 +707,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Void> setPermitsAsync(int permits) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local available = redis.call('get', KEYS[1]); " +
"if (available == false) then " +
"redis.call('set', KEYS[1], ARGV[1]); " +
@ -726,7 +726,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
@ -744,7 +744,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Void> addPermitsAsync(int permits) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "value = 0;"
@ -760,7 +760,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RFuture<Boolean> updateLeaseTimeAsync(String permitId, long leaseTime, TimeUnit unit) {
long timeoutDate = calcTimeout(leaseTime, unit);
byte[] id = ByteBufUtil.decodeHexDump(permitId);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
@ -785,8 +785,4 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return get(updateLeaseTimeAsync(permitId, leaseTime, unit));
}
private <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return commandExecutor.getServiceManager().execute(() -> commandExecutor.syncedEvalWriteAsync(key, codec, evalCommandType, script, keys, params));
}
}

@ -56,7 +56,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
@ -85,7 +85,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
@ -173,7 +173,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'read') then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +

@ -450,7 +450,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public RFuture<Integer> drainPermitsAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then " +
"return 0; " +
@ -477,7 +477,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
RFuture<Boolean> future = commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
@ -506,7 +506,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override
public RFuture<Void> addPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "value = 0;"

@ -128,7 +128,7 @@ public class RedissonSpinLock extends RedissonBaseLock {
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
@ -190,7 +190,7 @@ public class RedissonSpinLock extends RedissonBaseLock {
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "return 1 "
+ "else "

@ -54,7 +54,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
@ -130,7 +130,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'write') then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +

@ -134,7 +134,9 @@ public interface CommandAsyncExecutor {
void setEvalShaROSupported(boolean value);
<T> RFuture<T> syncedEvalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T> RFuture<T> syncedEvalWithRetry(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T> RFuture<T> syncedEval(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T> CompletionStage<T> handleNoSync(CompletionStage<T> stage, Supplier<CompletionStage<?>> supplier);

@ -734,7 +734,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T> RFuture<T> syncedEvalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
public <T> RFuture<T> syncedEvalWithRetry(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return getServiceManager().execute(() -> syncedEval(key, codec, evalCommandType, script, keys, params));
}
@Override
public <T> RFuture<T> syncedEval(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
CompletionStage<Map<String, String>> replicationFuture = CompletableFuture.completedFuture(Collections.emptyMap());
if (!getServiceManager().getConfig().checkSkipSlavesInit()) {
replicationFuture = writeAsync(key, RedisCommands.INFO_REPLICATION);

Loading…
Cancel
Save