Merge branch 'redisson:master' into master

pull/5100/head
LearningBot 2 years ago committed by GitHub
commit 6e56ee3788
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* Distributed implementation of {@link java.util.concurrent.locks.Lock} * Distributed implementation of {@link java.util.concurrent.locks.Lock}
@ -145,31 +144,28 @@ public class RedissonSpinLock extends RedissonBaseLock {
@Override @Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); final long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis(); final long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId(); final long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId); Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired // lock acquired
if (ttl == null) { if (ttl == null) {
return true; return true;
} }
time -= System.currentTimeMillis() - current; if (System.currentTimeMillis() - current >= time) {
if (time <= 0) {
acquireFailed(waitTime, unit, threadId); acquireFailed(waitTime, unit, threadId);
return false; return false;
} }
LockOptions.BackOffPolicy backOffPolicy = backOff.create(); LockOptions.BackOffPolicy backOffPolicy = backOff.create();
while (true) { while (true) {
current = System.currentTimeMillis();
Thread.sleep(backOffPolicy.getNextSleepPeriod()); Thread.sleep(backOffPolicy.getNextSleepPeriod());
ttl = tryAcquire(leaseTime, unit, threadId); ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) { if (ttl == null) {
return true; return true;
} }
time -= System.currentTimeMillis() - current; if (System.currentTimeMillis() - current >= time) {
if (time <= 0) {
acquireFailed(waitTime, unit, threadId); acquireFailed(waitTime, unit, threadId);
return false; return false;
} }
@ -262,16 +258,14 @@ public class RedissonSpinLock extends RedissonBaseLock {
long currentThreadId) { long currentThreadId) {
CompletableFuture<Boolean> result = new CompletableFuture<>(); CompletableFuture<Boolean> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
LockOptions.BackOffPolicy backOffPolicy = backOff.create(); LockOptions.BackOffPolicy backOffPolicy = backOff.create();
tryLock(leaseTime, unit, currentThreadId, result, time, backOffPolicy); tryLock(System.currentTimeMillis(), leaseTime, unit, currentThreadId, result, unit.toMillis(waitTime), backOffPolicy);
return new CompletableFutureWrapper<>(result); return new CompletableFutureWrapper<>(result);
} }
private void tryLock(long leaseTime, TimeUnit unit, long currentThreadId, CompletableFuture<Boolean> result, private void tryLock(long startTime, long leaseTime, TimeUnit unit, long currentThreadId, CompletableFuture<Boolean> result,
AtomicLong time, LockOptions.BackOffPolicy backOffPolicy) { long waitTime, LockOptions.BackOffPolicy backOffPolicy) {
long startTime = System.currentTimeMillis();
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId); RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.whenComplete((ttl, e) -> { ttlFuture.whenComplete((ttl, e) -> {
if (e != null) { if (e != null) {
@ -287,17 +281,14 @@ public class RedissonSpinLock extends RedissonBaseLock {
return; return;
} }
long el = System.currentTimeMillis() - startTime; if (System.currentTimeMillis() - startTime >= waitTime) {
time.addAndGet(-el);
if (time.get() <= 0) {
trySuccessFalse(currentThreadId, result); trySuccessFalse(currentThreadId, result);
return; return;
} }
long nextSleepPeriod = backOffPolicy.getNextSleepPeriod(); long nextSleepPeriod = backOffPolicy.getNextSleepPeriod();
getServiceManager().newTimeout( getServiceManager().newTimeout(
timeout -> tryLock(leaseTime, unit, currentThreadId, result, time, backOffPolicy), timeout -> tryLock(startTime, leaseTime, unit, currentThreadId, result, waitTime, backOffPolicy),
nextSleepPeriod, TimeUnit.MILLISECONDS); nextSleepPeriod, TimeUnit.MILLISECONDS);
}); });
} }

@ -473,6 +473,27 @@ public class RedissonSpinLockTest extends BaseConcurrentTest {
} }
@Test
public void testTryLockAsyncWaitTime() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock");
lock.lock();
AtomicBoolean lockAsyncSucceed = new AtomicBoolean(true);
Thread thread = new Thread(() -> {
RFuture<Boolean> booleanRFuture = lock.tryLockAsync(1, 30, TimeUnit.SECONDS);
booleanRFuture.whenComplete((res, e) -> {
if (e != null) {
Assertions.fail("Lock aquire failed for some reason");
}
lockAsyncSucceed.set(res);
});
});
thread.start();
Thread.sleep(1500);
assertThat(lockAsyncSucceed.get()).isFalse();
lock.forceUnlock();
}
@Test @Test
public void testTryLockAsyncFailed() throws InterruptedException { public void testTryLockAsyncFailed() throws InterruptedException {
RLock lock = redisson.getSpinLock("lock"); RLock lock = redisson.getSpinLock("lock");

Loading…
Cancel
Save