fixed concurrency problems

Signed-off-by: 小乌龟不会写代码 <41791762+coding-tortoise@users.noreply.github.com>
pull/3490/head
小乌龟不会写代码 4 years ago
parent fac18462b0
commit 13b83686a0

@ -100,8 +100,6 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override @Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
long wait = threadWaitTime; long wait = threadWaitTime;
if (waitTime != -1) { if (waitTime != -1) {
wait = unit.toMillis(waitTime); wait = unit.toMillis(waitTime);
@ -150,7 +148,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"end;" + "end;" +
"return 1;", "return 1;",
Arrays.asList(getName(), threadsQueueName, timeoutSetName), Arrays.asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime, wait); unit.toMillis(leaseTime), getLockName(threadId), currentTime, wait);
} }
if (command == RedisCommands.EVAL_LONG) { if (command == RedisCommands.EVAL_LONG) {
@ -227,7 +225,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"end;" + "end;" +
"return ttl;", "return ttl;",
Arrays.asList(getName(), threadsQueueName, timeoutSetName), Arrays.asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), wait, currentTime); unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);
} }
throw new IllegalArgumentException(); throw new IllegalArgumentException();

@ -144,11 +144,14 @@ public class RedissonLock extends RedissonBaseLock {
} }
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) { RFuture<Boolean> ttlRemainingFuture;
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); if (-1 != leaseTime) {
} ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, } else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { if (e != null) {
return; return;
@ -156,6 +159,7 @@ public class RedissonLock extends RedissonBaseLock {
// lock acquired // lock acquired
if (ttlRemaining) { if (ttlRemaining) {
internalLockLeaseTime = -1 == leaseTime ? internalLockLeaseTime : unit.toMillis(leaseTime);
scheduleExpirationRenewal(threadId); scheduleExpirationRenewal(threadId);
} }
}); });
@ -163,11 +167,13 @@ public class RedissonLock extends RedissonBaseLock {
} }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) { if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} } else {
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { if (e != null) {
return; return;
@ -175,6 +181,7 @@ public class RedissonLock extends RedissonBaseLock {
// lock acquired // lock acquired
if (ttlRemaining == null) { if (ttlRemaining == null) {
internalLockLeaseTime = -1 == leaseTime ? internalLockLeaseTime : unit.toMillis(leaseTime);
scheduleExpirationRenewal(threadId); scheduleExpirationRenewal(threadId);
} }
}); });
@ -187,8 +194,6 @@ public class RedissonLock extends RedissonBaseLock {
} }
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command, return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
@ -201,7 +206,7 @@ public class RedissonLock extends RedissonBaseLock {
"return nil; " + "return nil; " +
"end; " + "end; " +
"return redis.call('pttl', KEYS[1]);", "return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); Collections.singletonList(getName()), unit.toMillis(leaseTime), getLockName(threadId));
} }
@Override @Override

@ -55,8 +55,6 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override @Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command, return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
@ -78,7 +76,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"end;" + "end;" +
"return redis.call('pttl', KEYS[1]);", "return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));
} }
@Override @Override

@ -52,8 +52,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
@Override @Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command, return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
@ -72,7 +70,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"end;" + "end;" +
"return redis.call('pttl', KEYS[1]);", "return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()), Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId)); unit.toMillis(leaseTime), getLockName(threadId));
} }
@Override @Override

Loading…
Cancel
Save