Fixed - RFairLock.tryLock() method doesn't apply waitTimeout parameter. #2883

pull/2954/head
Nikita Koksharov 5 years ago
parent fc668d25a9
commit a6bbb11bfa

@ -46,7 +46,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
private final String timeoutSetName;
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 5000);
this(commandExecutor, name, 60000*5);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
@ -70,7 +70,12 @@ public class RedissonFairLock extends RedissonLock implements RLock {
}
@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
protected RFuture<Void> acquireFailedAsync(long waitTime, TimeUnit unit, long threadId) {
long wait = threadWaitTime;
if (waitTime != -1) {
wait = unit.toMillis(waitTime);
}
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
// get the existing timeout for the thread to remove
"local queue = redis.call('lrange', KEYS[1], 0, -1);" +
@ -90,13 +95,18 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('zrem', KEYS[2], ARGV[1]);" +
"redis.call('lrem', KEYS[1], 0, ARGV[1]);",
Arrays.<Object>asList(threadsQueueName, timeoutSetName),
getLockName(threadId), threadWaitTime);
getLockName(threadId), wait);
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
long wait = threadWaitTime;
if (waitTime != -1) {
wait = unit.toMillis(waitTime);
}
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
@ -139,8 +149,8 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"return nil;" +
"end;" +
"return 1;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime);
Arrays.asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime, wait);
}
if (command == RedisCommands.EVAL_LONG) {
@ -216,8 +226,8 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime);
Arrays.asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), wait, currentTime);
}
throw new IllegalArgumentException();
@ -263,7 +273,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
@ -279,7 +289,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName);
List<Object> keys = Arrays.asList(getName(), threadsQueueName, timeoutSetName);
return super.sizeInMemoryAsync(keys);
}
@ -289,7 +299,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"redis.call('pexpire', KEYS[2], ARGV[1]); " +
"return redis.call('pexpire', KEYS[3], ARGV[1]); ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
Arrays.asList(getName(), threadsQueueName, timeoutSetName),
timeUnit.toMillis(timeToLive));
}

@ -172,7 +172,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
@ -187,7 +187,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
@ -217,15 +217,17 @@ public class RedissonLock extends RedissonExpirable implements RLock {
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
@ -239,11 +241,13 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return ttlRemainingFuture;
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
@ -348,7 +352,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return result;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
@ -378,11 +382,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return new CommandBatchService(commandExecutor.getConnectionManager(), options);
}
private void acquireFailed(long threadId) {
get(acquireFailedAsync(threadId));
private void acquireFailed(long waitTime, TimeUnit unit, long threadId) {
get(acquireFailedAsync(waitTime, unit, threadId));
}
protected RFuture<Void> acquireFailedAsync(long threadId) {
protected RFuture<Void> acquireFailedAsync(long waitTime, TimeUnit unit, long threadId) {
return RedissonPromise.newSucceededFuture(null);
}
@ -391,7 +395,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
@ -399,7 +403,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
acquireFailed(waitTime, unit, threadId);
return false;
}
@ -413,20 +417,20 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
});
}
acquireFailed(threadId);
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
@ -434,7 +438,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
acquireFailed(waitTime, unit, threadId);
return false;
}
@ -448,7 +452,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
acquireFailed(waitTime, unit, threadId);
return false;
}
}
@ -629,7 +633,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long currentThreadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync(-1, leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
if (e != null) {
result.tryFailure(e);
@ -660,7 +664,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private void lockAsync(long leaseTime, TimeUnit unit,
RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId) {
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync(-1, leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
if (e != null) {
unsubscribe(subscribeFuture, currentThreadId);
@ -714,7 +718,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, null, threadId);
return tryAcquireOnceAsync(-1,-1, null, threadId);
}
@Override
@ -735,7 +739,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
long currentTime = System.currentTimeMillis();
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync(waitTime, leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
if (e != null) {
result.tryFailure(e);
@ -774,7 +778,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
tryLockAsync(time, waitTime, leaseTime, unit, subscribeFuture, result, currentThreadId);
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@ -795,7 +799,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
private void trySuccessFalse(long currentThreadId, RPromise<Boolean> result) {
acquireFailedAsync(currentThreadId).onComplete((res, e) -> {
acquireFailedAsync(-1, null, currentThreadId).onComplete((res, e) -> {
if (e == null) {
result.trySuccess(false);
} else {
@ -804,7 +808,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
});
}
private void tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit,
private void tryLockAsync(AtomicLong time, long waitTime, long leaseTime, TimeUnit unit,
RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId) {
if (result.isDone()) {
unsubscribe(subscribeFuture, currentThreadId);
@ -818,7 +822,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
long curr = System.currentTimeMillis();
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
RFuture<Long> ttlFuture = tryAcquireAsync(waitTime, leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
if (e != null) {
unsubscribe(subscribeFuture, currentThreadId);
@ -848,7 +852,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long current = System.currentTimeMillis();
RedissonLockEntry entry = subscribeFuture.getNow();
if (entry.getLatch().tryAcquire()) {
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
tryLockAsync(time, waitTime, leaseTime, unit, subscribeFuture, result, currentThreadId);
} else {
AtomicBoolean executed = new AtomicBoolean();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
@ -861,7 +865,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
tryLockAsync(time, waitTime, leaseTime, unit, subscribeFuture, result, currentThreadId);
};
entry.addListener(listener);
@ -877,7 +881,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
tryLockAsync(time, waitTime, leaseTime, unit, subscribeFuture, result, currentThreadId);
}
}
}, t, TimeUnit.MILLISECONDS);

@ -54,7 +54,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,

@ -51,7 +51,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,

@ -1,33 +1,67 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
public class RedissonFairLockTest extends BaseConcurrentTest {
private final Logger log = LoggerFactory.getLogger(RedissonFairLockTest.class);
@Test
public void testMultipleLocks() throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
1000L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
Config cfg = createConfig();
cfg.useSingleServer().setSubscriptionsPerConnection(100);
RedissonClient redisson = Redisson.create(cfg);
AtomicInteger acquiredLocks = new AtomicInteger();
for (int i = 0; i < 500; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
RLock test = redisson.getFairLock("lock");
try {
test.lock(5, TimeUnit.SECONDS);
try {
Thread.sleep(200); // 200ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
acquiredLocks.incrementAndGet();
} finally {
test.unlock();
}
}
});
}
executorService.shutdown();
assertThat(executorService.awaitTermination(3, TimeUnit.MINUTES)).isTrue();
assertThat(acquiredLocks.get()).isEqualTo(500);
redisson.shutdown();
}
@Test
public void testWaitTimeoutDrift() throws Exception {
int leaseTimeSeconds = 30;
@ -208,7 +242,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long epiry = ((timeout - new Date().getTime()) / 1000);
log.info("Item " + (i++) + " expires in " + epiry + " seconds");
//the Redisson library uses this 5000ms delay in the code
if (epiry > leaseTimeSeconds + 5) {
if (epiry > leaseTimeSeconds + 60*5) {
Assert.fail("It would take more than " + leaseTimeSeconds + "s to get the lock!");
}
}
@ -235,44 +269,44 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadFourthWaiter = 105;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTL);
Assert.assertTrue("Expected 35000 +/- 100 but was " + secondTTL, secondTTL >= 34900 && secondTTL <= 35100);
// try the third, and check the TTL
Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Assert.assertTrue("Expected 40000 +/- 100 but was " + thirdTTL, thirdTTL >= 39900 && thirdTTL <= 40100);
// try the fourth, and check the TTL
Long fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Long fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(fourthTTL);
Assert.assertTrue("Expected 45000 +/- 100 but was " + fourthTTL, fourthTTL >= 44900 && fourthTTL <= 45100);
// wait timeout the second waiter
lock.acquireFailedAsync(threadSecondWaiter).await().get();
lock.acquireFailedAsync(5000, TimeUnit.MILLISECONDS, threadSecondWaiter).await().get();
// try the first, and check the TTL
firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100);
// try the third, and check the TTL
thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Assert.assertTrue("Expected 35000 +/- 300 but was " + thirdTTL, thirdTTL >= 34700 && thirdTTL <= 35300);
// try the fourth, and check the TTL
fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(fourthTTL);
Assert.assertTrue("Expected 40000 +/- 100 but was " + fourthTTL, fourthTTL >= 39900 && fourthTTL <= 40100);
@ -282,15 +316,15 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(unlocked);
// acquire the lock immediately with the 1nd
ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// try the third, and check the TTL
thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Assert.assertTrue("Expected 30000 +/- 300 but was " + thirdTTL, thirdTTL >= 29700 && thirdTTL <= 30300);
fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(fourthTTL);
Assert.assertTrue("Expected 35000 +/- 100 but was " + fourthTTL, fourthTTL >= 34900 && fourthTTL <= 35100);
}
@ -316,15 +350,15 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadThirdWaiter = 104;
// take the lock successfully
Boolean locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Boolean locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertTrue(locked);
// fail to get the lock, but end up in the thread queue w/ ttl + 100ms timeout
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// fail to get the lock again, but end up in the thread queue w/ ttl + 200ms timeout
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// unlock the original lock holder
@ -332,24 +366,24 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(unlocked);
// get the lock
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertTrue(locked);
// fail to get the lock, keeping ttl of lock ttl + 200ms
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// fail to get the lock, keeping ttl of lock ttl + 100ms
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// fail to get the lock, keeping ttl of lock ttl + 200ms
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
Thread.sleep(500);
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertTrue(locked);
}
@ -373,15 +407,15 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadThirdWaiter = 104;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTL);
// unlock the original lock holder
@ -389,20 +423,20 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertNotNull(unlocked);
Assert.assertTrue(unlocked);
ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Long secondTTLAgain = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTLAgain = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTLAgain);
long diff = secondTTL - secondTTLAgain;
Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100);
diff = thirdTTL - secondTTLAgain;
Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100);
thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
diff = thirdTTL - secondTTLAgain;
Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100);
@ -430,18 +464,18 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
long threadThirdWaiter = 104;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Long ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Long firstTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Long secondTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTL);
Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Long thirdTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
long diff = thirdTTL - firstTTL;
@ -449,7 +483,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Thread.sleep(thirdTTL + threadWaitTime);
ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
}

Loading…
Cancel
Save