Merge pull request #2100 from jncorpron/timeout_drift

Fix #2099 timeout drift in RedissonFairLock
pull/2247/head^2
Nikita Koksharov 6 years ago committed by GitHub
commit d5a69754bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -40,18 +40,23 @@ import org.redisson.pubsub.LockPubSub;
*/
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime = 5000;
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 5000);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
@Override
protected RedissonLockEntry getEntry(long threadId) {
return pubSub.getEntry(getEntryName() + ":" + threadId);
@ -59,32 +64,40 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName() + ":" + threadId,
return pubSub.subscribe(getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId));
}
@Override
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
pubSub.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
pubSub.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId));
}
@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local firstThreadId = redis.call('lindex', KEYS[1], 0); " +
"if firstThreadId == ARGV[1] then " +
"local keys = redis.call('zrange', KEYS[2], 0, -1); " +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" +
"end;" +
"end;" +
"redis.call('zrem', KEYS[2], ARGV[1]); " +
"redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
Arrays.<Object>asList(threadsQueueName, timeoutSetName),
getLockName(threadId), threadWaitTime);
// get the existing timeout for the thread to remove
"local queue = redis.call('lrange', KEYS[1], 0, -1);" +
// find the location in the queue where the thread is
"local i = 1;" +
"while i <= #queue and queue[i] ~= ARGV[1] do " +
"i = i + 1;" +
"end;" +
// go to the next index which will exist after the current thread is removed
"i = i + 1;" +
// decrement the timeout for the rest of the queue after the thread being removed
"while i <= #queue do " +
"redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), queue[i]);" +
"i = i + 1;" +
"end;" +
// remove the thread from the queue and timeouts set
"redis.call('zrem', KEYS[2], ARGV[1]);" +
"redis.call('lrem', KEYS[1], 0, ARGV[1]);",
Arrays.<Object>asList(threadsQueueName, timeoutSetName),
getLockName(threadId), threadWaitTime);
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
@ -93,90 +106,128 @@ public class RedissonFairLock extends RedissonLock implements RLock {
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[3]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return 1;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime);
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[3]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +
"end;" +
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"return 1;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime);
}
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else "
+ "ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
"local timeout = ttl + tonumber(ARGV[3]);" +
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// check if the lock can be acquired now
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// check if the lock is already held, and this is a re-entry
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// the lock cannot be acquired
// check if the thread is already in the queue
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime);
}
throw new IllegalArgumentException();
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -220,7 +271,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
@ -230,13 +281,13 @@ public class RedissonFairLock extends RedissonLock implements RLock {
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), threadsQueueName, timeoutSetName);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName);
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -299,4 +350,4 @@ public class RedissonFairLock extends RedissonLock implements RLock {
LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis());
}
}
}

@ -20,15 +20,16 @@ import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RScript;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedissonFairLockTest extends BaseConcurrentTest {
private final Logger log = LoggerFactory.getLogger(RedissonFairLockTest.class);
@Test
public void testTimeoutDrift() throws Exception {
public void testWaitTimeoutDrift() throws Exception {
int leaseTimeSeconds = 30;
RLock lock = redisson.getFairLock("test-fair-lock");
AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false);
@ -38,12 +39,14 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
//to exacerbate the problem, use a very short wait time and a long lease time
//this will end up setting the queue timeout score to a value far away in the future
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 50; i++) {
final int finalI = i;
executor.submit(() -> {
log.info("running " + finalI + " in thread " + Thread.currentThread().getId());
try {
if (lock.tryLock(500, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) {
log.info("Lock taken by thread " + Thread.currentThread().getId());
Thread.sleep(30000);
Thread.sleep(10000);
try {
//this could fail before use sleep for the same value as the lock expiry, that's fine
//for the purpose of this test
@ -53,7 +56,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
}
}
} catch (InterruptedException ex) {
log.warn("Interrupted");
log.warn("Interrupted " + Thread.currentThread().getId());
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
@ -66,6 +69,98 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
}
//we now launch one more thread and kill it before it manages to fail and clean up
//that thread will end up with a timeout that will prevent any others from taking the lock for a long time
executor.submit(() -> {
log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId());
try {
lastThreadTryingToLock.set(true);
if (lock.tryLock(30000, 30000, TimeUnit.MILLISECONDS)) {
log.info("Lock taken by final thread " + Thread.currentThread().getId());
Thread.sleep(1000);
lock.unlock();
log.info("Lock released by final thread " + Thread.currentThread().getId());
}
} catch (InterruptedException ex) {
log.warn("Interrupted " + Thread.currentThread().getId());
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
});
//now we wait for all others threads to stop trying, and only the last thread is running
while (!lastThreadTryingToLock.get()) {
Thread.sleep(100);
}
//try to kill that last thread, and don't let it clean up after itself
executor.shutdownNow();
//force the lock to unlock just in case
try {
lock.forceUnlock();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
if (lock.isLocked()) {
Assert.fail("Lock should have been unlocked by now");
}
//check the timeout scores - they should all be within a reasonable amount of time from now
List<Long> queue = redisson.getScript(LongCodec.INSTANCE).eval(RScript.Mode.READ_ONLY,
"local result = {}; " +
"local timeouts = redis.call('zrange', KEYS[1], 0, 99, 'WITHSCORES'); " +
"for i=1,#timeouts,2 do " +
"table.insert(result, timeouts[i+1]); " +
"end; " +
"return result; ",
RScript.ReturnType.MULTI,
Collections.singletonList("redisson_lock_timeout:{test-fair-lock}"));
int i = 0;
for (Long timeout : queue) {
long epiry = ((timeout - new Date().getTime()) / 1000);
log.info("Item " + (i++) + " expires in " + epiry + " seconds");
//the Redisson library uses this 5000ms delay in the code
if (epiry > leaseTimeSeconds + 5) {
Assert.fail("It would take more than " + leaseTimeSeconds + "s to get the lock!");
}
}
}
@Test
public void testLockAcquiredTimeoutDrift() throws Exception {
int leaseTimeSeconds = 30;
RLock lock = redisson.getFairLock("test-fair-lock");
//create a scenario where the same 3 threads keep on trying to get a lock
//to exacerbate the problem, use a very short wait time and a long lease time
//this will end up setting the queue timeout score to a value far away in the future
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int finalI = i;
executor.submit(() -> {
log.info("running " + finalI + " in thread " + Thread.currentThread().getId());
try {
if (lock.tryLock(3000, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) {
log.info("Lock taken by thread " + Thread.currentThread().getId());
Thread.sleep(100);
try {
//this could fail before use sleep for the same value as the lock expiry, that's fine
//for the purpose of this test
lock.unlock();
log.info("Lock released by thread " + Thread.currentThread().getId());
} catch (Exception ignored) {
}
}
} catch (InterruptedException ex) {
log.warn("Interrupted " + Thread.currentThread().getId());
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
});
//for the first 3 threads, add a 50ms delay. This is to recreate the worst case scenario, where all threads
//attempting to lock do so in a staggered pattern. This delay will be carried over by the thread pool.
Thread.sleep(50);
}
AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false);
//we now launch one more thread and kill it before it manages to fail and clean up
//that thread will end up with a timeout that will prevent any others from taking the lock for a long time
executor.submit(() -> {
log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId());
try {
@ -91,7 +186,8 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
//force the lock to unlock just in case
try {
lock.forceUnlock();
} catch (Exception ignored) {
} catch (Exception e) {
log.error(e.getMessage(), e);
}
if (lock.isLocked()) {
Assert.fail("Lock should have been unlocked by now");
@ -117,11 +213,342 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
}
}
}
@Test
public void testAcquireFailedTimeoutDrift_Descrete() throws Exception {
long leaseTime = 30_000;
// we're testing interaction of various internal methods, so create a Redisson instance for protected access
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
"testAcquireFailedTimeoutDrift_Descrete");
// clear out any prior state
lock.delete();
long threadInit = 101;
long threadFirstWaiter = 102;
long threadSecondWaiter = 103;
long threadThirdWaiter = 104;
long threadFourthWaiter = 105;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTL);
Assert.assertTrue("Expected 35000 +/- 100 but was " + secondTTL, secondTTL >= 34900 && secondTTL <= 35100);
// try the third, and check the TTL
Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Assert.assertTrue("Expected 40000 +/- 100 but was " + thirdTTL, thirdTTL >= 39900 && thirdTTL <= 40100);
// try the fourth, and check the TTL
Long fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(fourthTTL);
Assert.assertTrue("Expected 45000 +/- 100 but was " + fourthTTL, fourthTTL >= 44900 && fourthTTL <= 45100);
// wait timeout the second waiter
lock.acquireFailedAsync(threadSecondWaiter).await().get();
// try the first, and check the TTL
firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100);
// try the third, and check the TTL
thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Assert.assertTrue("Expected 35000 +/- 300 but was " + thirdTTL, thirdTTL >= 34700 && thirdTTL <= 35300);
// try the fourth, and check the TTL
fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(fourthTTL);
Assert.assertTrue("Expected 40000 +/- 100 but was " + fourthTTL, fourthTTL >= 39900 && fourthTTL <= 40100);
// unlock the original lock holder
Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow();
Assert.assertNotNull(unlocked);
Assert.assertTrue(unlocked);
// acquire the lock immediately with the 1nd
ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// try the third, and check the TTL
thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Assert.assertTrue("Expected 30000 +/- 300 but was " + thirdTTL, thirdTTL >= 29700 && thirdTTL <= 30300);
fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(fourthTTL);
Assert.assertTrue("Expected 35000 +/- 100 but was " + fourthTTL, fourthTTL >= 34900 && fourthTTL <= 35100);
}
@Test
public void testLockAcquiredBooleanTimeoutDrift_Descrete() throws Exception {
long leaseTime = 500;
// we're testing interaction of various internal methods, so create a Redisson instance for protected access
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
"testLockAcquiredTimeoutDrift_Descrete",
100);
// clear out any prior state
lock.delete();
long threadInit = 101;
long threadFirstWaiter = 102;
long threadSecondWaiter = 103;
long threadThirdWaiter = 104;
// take the lock successfully
Boolean locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertTrue(locked);
// fail to get the lock, but end up in the thread queue w/ ttl + 100ms timeout
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// fail to get the lock again, but end up in the thread queue w/ ttl + 200ms timeout
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// unlock the original lock holder
Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow();
Assert.assertTrue(unlocked);
// get the lock
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertTrue(locked);
// fail to get the lock, keeping ttl of lock ttl + 200ms
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// fail to get the lock, keeping ttl of lock ttl + 100ms
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
// fail to get the lock, keeping ttl of lock ttl + 200ms
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertFalse(locked);
Thread.sleep(490);
locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get();
Assert.assertTrue(locked);
}
@Test
public void testLockAcquiredTimeoutDrift_Descrete() throws Exception {
long leaseTime = 300_000;
// we're testing interaction of various internal methods, so create a Redisson instance for protected access
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
"testLockAcquiredTimeoutDrift_Descrete");
// clear out any prior state
lock.delete();
long threadInit = 101;
long threadFirstWaiter = 102;
long threadSecondWaiter = 103;
long threadThirdWaiter = 104;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTL);
// unlock the original lock holder
Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow();
Assert.assertNotNull(unlocked);
Assert.assertTrue(unlocked);
ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
Long secondTTLAgain = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTLAgain);
long diff = secondTTL - secondTTLAgain;
Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100);
diff = thirdTTL - secondTTLAgain;
Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100);
thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
diff = thirdTTL - secondTTLAgain;
Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100);
}
@Test
public void testAbandonedTimeoutDrift_Descrete() throws Exception {
long leaseTime = 500;
long threadWaitTime = 100;
// we're testing interaction of various internal methods, so create a Redisson instance for protected access
Redisson redisson = new Redisson(createConfig());
RedissonFairLock lock = new RedissonFairLock(
redisson.connectionManager.getCommandExecutor(),
"testAbandonedTimeoutDrift_Descrete",
threadWaitTime);
// clear out any prior state
lock.delete();
long threadInit = 101;
long threadFirstWaiter = 102;
long threadSecondWaiter = 103;
long threadThirdWaiter = 104;
// take the lock successfully
Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
// fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout
Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(firstTTL);
// fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout
Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(secondTTL);
Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNotNull(thirdTTL);
long diff = thirdTTL - firstTTL;
Assert.assertTrue("Expected 200 +/- 10 but was " + diff, diff > 190 && diff < 210);
Thread.sleep(thirdTTL + threadWaitTime);
ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get();
Assert.assertNull(ttl);
}
@Test
public void testFirstThreadDeathTimeoutDrift() throws Exception {
int leaseTimeSeconds = 30;
RLock lock = redisson.getFairLock("test-fair-lock");
AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false);
//create a scenario where the same 3 threads keep on trying to get a lock
//to exacerbate the problem, use a very short wait time and a long lease time
//this will end up setting the queue timeout score to a value far away in the future
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int finalI = i;
executor.submit(() -> {
log.info("running " + finalI + " in thread " + Thread.currentThread().getId());
try {
if (lock.tryLock(3000, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) {
log.info("Lock taken by thread " + Thread.currentThread().getId());
Thread.sleep(100);
try {
//this could fail before use sleep for the same value as the lock expiry, that's fine
//for the purpose of this test
lock.unlock();
log.info("Lock released by thread " + Thread.currentThread().getId());
} catch (Exception ignored) {
}
}
} catch (InterruptedException ex) {
log.warn("Interrupted " + Thread.currentThread().getId());
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
});
//for the first 3 threads, add a 50ms delay. This is to recreate the worst case scenario, where all threads
//attempting to lock do so in a staggered pattern. This delay will be carried over by the thread pool.
if (i < 3) {
Thread.sleep(50);
}
}
//we now launch one more thread and kill it before it manages to fail and clean up
//that thread will end up with a timeout that will prevent any others from taking the lock for a long time
executor.submit(() -> {
log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId());
try {
lastThreadTryingToLock.set(true);
if (lock.tryLock(30000, 30000, TimeUnit.MILLISECONDS)) {
log.info("Lock taken by final thread " + Thread.currentThread().getId());
Thread.sleep(1000);
lock.unlock();
log.info("Lock released by final thread " + Thread.currentThread().getId());
}
} catch (InterruptedException ex) {
log.warn("Interrupted " + Thread.currentThread().getId());
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
});
//now we wait for all others threads to stop trying, and only the last thread is running
while (!lastThreadTryingToLock.get()) {
Thread.sleep(100);
}
//try to kill that last thread, and don't let it clean up after itself
executor.shutdownNow();
//force the lock to unlock just in case
try {
lock.forceUnlock();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
if (lock.isLocked()) {
Assert.fail("Lock should have been unlocked by now");
}
//check the timeout scores - they should all be within a reasonable amount of time from now
List<Long> queue = redisson.getScript(LongCodec.INSTANCE).eval(RScript.Mode.READ_ONLY,
"local result = {}; " +
"local timeouts = redis.call('zrange', KEYS[1], 0, 99, 'WITHSCORES'); " +
"for i=1,#timeouts,2 do " +
"table.insert(result, timeouts[i+1]); " +
"end; " +
"return result; ",
RScript.ReturnType.MULTI,
Collections.singletonList("redisson_lock_timeout:{test-fair-lock}"));
for (int i = 0; i < queue.size(); i++) {
long timeout = queue.get(i);
long epiry = ((timeout - new Date().getTime()) / 1000);
log.info("Item " + i + " expires in " + epiry + " seconds");
// the Redisson library uses this 5000ms delay in the code
Assert.assertFalse("It would take more than " + (leaseTimeSeconds + 5 * (i + 1)) + "s to get the lock!", epiry > leaseTimeSeconds + 5 * (i + 1));
}
}
@Test
public void testTryLockNonDelayed() throws InterruptedException {
String LOCK_NAME = "SOME_LOCK";
Thread t1 = new Thread(() -> {
RLock fairLock = redisson.getFairLock(LOCK_NAME);
try {
@ -140,7 +567,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
fairLock.unlock();
}
});
Thread t2 = new Thread(() -> {
try {
Thread.sleep(200L);
@ -158,13 +585,13 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
fairLock.unlock();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
RLock fairLock = redisson.getFairLock(LOCK_NAME);
try {
if (!fairLock.tryLock(0, TimeUnit.SECONDS)) {
@ -174,7 +601,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
fairLock.unlock();
}
}
@Test
public void testTryLockWait() throws InterruptedException {
testSingleInstanceConcurrency(1, r -> {
@ -183,12 +610,12 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
});
RLock lock = redisson.getFairLock("lock");
long startTime = System.currentTimeMillis();
lock.tryLock(3, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3100L);
}
@Test
public void testForceUnlock() {
RLock lock = redisson.getFairLock("lock");
@ -235,7 +662,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getFairLock("lock");
await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@ -443,7 +870,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertEquals(iterations, lockedCounter.get());
}
@Test
public void testConcurrency_MultiInstance_Ordering() throws InterruptedException {
final ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();
@ -468,7 +895,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Thread.sleep(10);
t1.start();
}
await().atMost(30, TimeUnit.SECONDS).until(() -> lockedCounter.get() == totalThreads);
}

Loading…
Cancel
Save