diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index d8021039b..864e56d96 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -40,18 +40,23 @@ import org.redisson.pubsub.LockPubSub; */ public class RedissonFairLock extends RedissonLock implements RLock { - private final long threadWaitTime = 5000; + private final long threadWaitTime; private final CommandAsyncExecutor commandExecutor; private final String threadsQueueName; private final String timeoutSetName; public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { + this(commandExecutor, name, 5000); + } + + public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) { super(commandExecutor, name); this.commandExecutor = commandExecutor; + this.threadWaitTime = threadWaitTime; threadsQueueName = prefixName("redisson_lock_queue", name); timeoutSetName = prefixName("redisson_lock_timeout", name); } - + @Override protected RedissonLockEntry getEntry(long threadId) { return pubSub.getEntry(getEntryName() + ":" + threadId); @@ -59,32 +64,40 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override protected RFuture subscribe(long threadId) { - return pubSub.subscribe(getEntryName() + ":" + threadId, + return pubSub.subscribe(getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId)); } @Override protected void unsubscribe(RFuture future, long threadId) { - pubSub.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, + pubSub.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId)); } @Override protected RFuture acquireFailedAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, - "local firstThreadId = redis.call('lindex', KEYS[1], 0); " + - "if firstThreadId == ARGV[1] then " + - "local keys = redis.call('zrange', KEYS[2], 0, -1); " + - "for i = 1, #keys, 1 do " + - "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" + - "end;" + - "end;" + - "redis.call('zrem', KEYS[2], ARGV[1]); " + - "redis.call('lrem', KEYS[1], 0, ARGV[1]); ", - Arrays.asList(threadsQueueName, timeoutSetName), - getLockName(threadId), threadWaitTime); + // get the existing timeout for the thread to remove + "local queue = redis.call('lrange', KEYS[1], 0, -1);" + + // find the location in the queue where the thread is + "local i = 1;" + + "while i <= #queue and queue[i] ~= ARGV[1] do " + + "i = i + 1;" + + "end;" + + // go to the next index which will exist after the current thread is removed + "i = i + 1;" + + // decrement the timeout for the rest of the queue after the thread being removed + "while i <= #queue do " + + "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), queue[i]);" + + "i = i + 1;" + + "end;" + + // remove the thread from the queue and timeouts set + "redis.call('zrem', KEYS[2], ARGV[1]);" + + "redis.call('lrem', KEYS[1], 0, ARGV[1]);", + Arrays.asList(threadsQueueName, timeoutSetName), + getLockName(threadId), threadWaitTime); } - + @Override RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); @@ -93,90 +106,128 @@ public class RedissonFairLock extends RedissonLock implements RLock { if (command == RedisCommands.EVAL_NULL_BOOLEAN) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads - "while true do " - + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" - + "if firstThreadId2 == false then " - + "break;" - + "end; " - + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" - + "if timeout <= tonumber(ARGV[3]) then " - + "redis.call('zrem', KEYS[3], firstThreadId2); " - + "redis.call('lpop', KEYS[2]); " - + "else " - + "break;" - + "end; " - + "end;" - + - - "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " - + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + - "redis.call('lpop', KEYS[2]); " + - "redis.call('zrem', KEYS[3], ARGV[2]); " + - "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - "return 1;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName), - internalLockLeaseTime, getLockName(threadId), currentTime); + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end;" + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[3]) then " + + // remove the item from the queue and timeout set + // NOTE we do not alter any other timeout + "redis.call('zrem', KEYS[3], firstThreadId2);" + + "redis.call('lpop', KEYS[2]);" + + "else " + + "break;" + + "end;" + + "end;" + + + "if (redis.call('exists', KEYS[1]) == 0) " + + "and ((redis.call('exists', KEYS[2]) == 0) " + + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + + "redis.call('lpop', KEYS[2]);" + + "redis.call('zrem', KEYS[3], ARGV[2]);" + + + // decrease timeouts for all waiting in the queue + "local keys = redis.call('zrange', KEYS[3], 0, -1);" + + "for i = 1, #keys, 1 do " + + "redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" + + "end;" + + + "redis.call('hset', KEYS[1], ARGV[2], 1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + + "redis.call('hincrby', KEYS[1], ARGV[2], 1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + "return 1;", + Arrays.asList(getName(), threadsQueueName, timeoutSetName), + internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime); } - + if (command == RedisCommands.EVAL_LONG) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads - "while true do " - + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" - + "if firstThreadId2 == false then " - + "break;" - + "end; " - + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" - + "if timeout <= tonumber(ARGV[4]) then " - + "redis.call('zrem', KEYS[3], firstThreadId2); " - + "redis.call('lpop', KEYS[2]); " - + "else " - + "break;" - + "end; " - + "end;" - - + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " - + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + - "redis.call('lpop', KEYS[2]); " + - "redis.call('zrem', KEYS[3], ARGV[2]); " + - "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - - "local firstThreadId = redis.call('lindex', KEYS[2], 0); " + - "local ttl; " + - "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + - "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + - "else " - + "ttl = redis.call('pttl', KEYS[1]);" + - "end; " + - - "local timeout = ttl + tonumber(ARGV[3]);" + - "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + - "redis.call('rpush', KEYS[2], ARGV[2]);" + - "end; " + - "return ttl;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName), - internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime); + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end;" + + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[4]) then " + + // remove the item from the queue and timeout set + // NOTE we do not alter any other timeout + "redis.call('zrem', KEYS[3], firstThreadId2);" + + "redis.call('lpop', KEYS[2]);" + + "else " + + "break;" + + "end;" + + "end;" + + + // check if the lock can be acquired now + "if (redis.call('exists', KEYS[1]) == 0) " + + "and ((redis.call('exists', KEYS[2]) == 0) " + + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + + + // remove this thread from the queue and timeout set + "redis.call('lpop', KEYS[2]);" + + "redis.call('zrem', KEYS[3], ARGV[2]);" + + + // decrease timeouts for all waiting in the queue + "local keys = redis.call('zrange', KEYS[3], 0, -1);" + + "for i = 1, #keys, 1 do " + + "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" + + "end;" + + + // acquire the lock and set the TTL for the lease + "redis.call('hset', KEYS[1], ARGV[2], 1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + + // check if the lock is already held, and this is a re-entry + "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + + "redis.call('hincrby', KEYS[1], ARGV[2],1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + + // the lock cannot be acquired + // check if the thread is already in the queue + "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" + + "if timeout ~= false then " + + // the real timeout is the timeout of the prior thread + // in the queue, but this is approximately correct, and + // avoids having to traverse the queue + "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" + + "end;" + + + // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of + // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the + // threadWaitTime + "local lastThreadId = redis.call('lindex', KEYS[2], -1);" + + "local ttl;" + + "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + + "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + + "else " + + "ttl = redis.call('pttl', KEYS[1]);" + + "end;" + + "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + + "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + + "redis.call('rpush', KEYS[2], ARGV[2]);" + + "end;" + + "return ttl;", + Arrays.asList(getName(), threadsQueueName, timeoutSetName), + internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime); } - + throw new IllegalArgumentException(); } - + @Override protected RFuture unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -220,7 +271,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); } - + @Override public Condition newCondition() { throw new UnsupportedOperationException(); @@ -230,13 +281,13 @@ public class RedissonFairLock extends RedissonLock implements RLock { public RFuture deleteAsync() { return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), threadsQueueName, timeoutSetName); } - + @Override public RFuture sizeInMemoryAsync() { List keys = Arrays.asList(getName(), threadsQueueName, timeoutSetName); return super.sizeInMemoryAsync(keys); } - + @Override public RFuture expireAsync(long timeToLive, TimeUnit timeUnit) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -299,4 +350,4 @@ public class RedissonFairLock extends RedissonLock implements RLock { LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis()); } -} +} \ No newline at end of file diff --git a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java index 4e068cab1..5b5254c21 100644 --- a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java @@ -20,15 +20,16 @@ import org.junit.Test; import org.redisson.api.RLock; import org.redisson.api.RScript; import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RedissonFairLockTest extends BaseConcurrentTest { private final Logger log = LoggerFactory.getLogger(RedissonFairLockTest.class); - + @Test - public void testTimeoutDrift() throws Exception { + public void testWaitTimeoutDrift() throws Exception { int leaseTimeSeconds = 30; RLock lock = redisson.getFairLock("test-fair-lock"); AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false); @@ -38,12 +39,14 @@ public class RedissonFairLockTest extends BaseConcurrentTest { //to exacerbate the problem, use a very short wait time and a long lease time //this will end up setting the queue timeout score to a value far away in the future ExecutorService executor = Executors.newFixedThreadPool(3); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { + final int finalI = i; executor.submit(() -> { + log.info("running " + finalI + " in thread " + Thread.currentThread().getId()); try { if (lock.tryLock(500, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) { log.info("Lock taken by thread " + Thread.currentThread().getId()); - Thread.sleep(30000); + Thread.sleep(10000); try { //this could fail before use sleep for the same value as the lock expiry, that's fine //for the purpose of this test @@ -53,7 +56,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { } } } catch (InterruptedException ex) { - log.warn("Interrupted"); + log.warn("Interrupted " + Thread.currentThread().getId()); } catch (Exception ex) { log.error(ex.getMessage(), ex); } @@ -66,6 +69,98 @@ public class RedissonFairLockTest extends BaseConcurrentTest { } //we now launch one more thread and kill it before it manages to fail and clean up //that thread will end up with a timeout that will prevent any others from taking the lock for a long time + executor.submit(() -> { + log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId()); + try { + lastThreadTryingToLock.set(true); + if (lock.tryLock(30000, 30000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by final thread " + Thread.currentThread().getId()); + Thread.sleep(1000); + lock.unlock(); + log.info("Lock released by final thread " + Thread.currentThread().getId()); + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //now we wait for all others threads to stop trying, and only the last thread is running + while (!lastThreadTryingToLock.get()) { + Thread.sleep(100); + } + //try to kill that last thread, and don't let it clean up after itself + executor.shutdownNow(); + //force the lock to unlock just in case + try { + lock.forceUnlock(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + if (lock.isLocked()) { + Assert.fail("Lock should have been unlocked by now"); + } + //check the timeout scores - they should all be within a reasonable amount of time from now + List queue = redisson.getScript(LongCodec.INSTANCE).eval(RScript.Mode.READ_ONLY, + "local result = {}; " + + "local timeouts = redis.call('zrange', KEYS[1], 0, 99, 'WITHSCORES'); " + + "for i=1,#timeouts,2 do " + + "table.insert(result, timeouts[i+1]); " + + "end; " + + "return result; ", + RScript.ReturnType.MULTI, + Collections.singletonList("redisson_lock_timeout:{test-fair-lock}")); + + int i = 0; + for (Long timeout : queue) { + long epiry = ((timeout - new Date().getTime()) / 1000); + log.info("Item " + (i++) + " expires in " + epiry + " seconds"); + //the Redisson library uses this 5000ms delay in the code + if (epiry > leaseTimeSeconds + 5) { + Assert.fail("It would take more than " + leaseTimeSeconds + "s to get the lock!"); + } + } + } + + @Test + public void testLockAcquiredTimeoutDrift() throws Exception { + int leaseTimeSeconds = 30; + RLock lock = redisson.getFairLock("test-fair-lock"); + + //create a scenario where the same 3 threads keep on trying to get a lock + //to exacerbate the problem, use a very short wait time and a long lease time + //this will end up setting the queue timeout score to a value far away in the future + ExecutorService executor = Executors.newFixedThreadPool(3); + for (int i = 0; i < 3; i++) { + final int finalI = i; + executor.submit(() -> { + log.info("running " + finalI + " in thread " + Thread.currentThread().getId()); + try { + if (lock.tryLock(3000, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by thread " + Thread.currentThread().getId()); + Thread.sleep(100); + try { + //this could fail before use sleep for the same value as the lock expiry, that's fine + //for the purpose of this test + lock.unlock(); + log.info("Lock released by thread " + Thread.currentThread().getId()); + } catch (Exception ignored) { + } + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //for the first 3 threads, add a 50ms delay. This is to recreate the worst case scenario, where all threads + //attempting to lock do so in a staggered pattern. This delay will be carried over by the thread pool. + Thread.sleep(50); + } + + AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false); + //we now launch one more thread and kill it before it manages to fail and clean up + //that thread will end up with a timeout that will prevent any others from taking the lock for a long time executor.submit(() -> { log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId()); try { @@ -91,7 +186,8 @@ public class RedissonFairLockTest extends BaseConcurrentTest { //force the lock to unlock just in case try { lock.forceUnlock(); - } catch (Exception ignored) { + } catch (Exception e) { + log.error(e.getMessage(), e); } if (lock.isLocked()) { Assert.fail("Lock should have been unlocked by now"); @@ -117,11 +213,342 @@ public class RedissonFairLockTest extends BaseConcurrentTest { } } } - + + @Test + public void testAcquireFailedTimeoutDrift_Descrete() throws Exception { + long leaseTime = 30_000; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testAcquireFailedTimeoutDrift_Descrete"); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + long threadFourthWaiter = 105; + + // take the lock successfully + Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout + Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout + Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTL); + Assert.assertTrue("Expected 35000 +/- 100 but was " + secondTTL, secondTTL >= 34900 && secondTTL <= 35100); + + // try the third, and check the TTL + Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + Assert.assertTrue("Expected 40000 +/- 100 but was " + thirdTTL, thirdTTL >= 39900 && thirdTTL <= 40100); + + // try the fourth, and check the TTL + Long fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(fourthTTL); + Assert.assertTrue("Expected 45000 +/- 100 but was " + fourthTTL, fourthTTL >= 44900 && fourthTTL <= 45100); + + // wait timeout the second waiter + lock.acquireFailedAsync(threadSecondWaiter).await().get(); + + // try the first, and check the TTL + firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100); + + // try the third, and check the TTL + thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + Assert.assertTrue("Expected 35000 +/- 300 but was " + thirdTTL, thirdTTL >= 34700 && thirdTTL <= 35300); + + // try the fourth, and check the TTL + fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(fourthTTL); + Assert.assertTrue("Expected 40000 +/- 100 but was " + fourthTTL, fourthTTL >= 39900 && fourthTTL <= 40100); + + // unlock the original lock holder + Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Assert.assertNotNull(unlocked); + Assert.assertTrue(unlocked); + + // acquire the lock immediately with the 1nd + ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // try the third, and check the TTL + thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + Assert.assertTrue("Expected 30000 +/- 300 but was " + thirdTTL, thirdTTL >= 29700 && thirdTTL <= 30300); + + fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(fourthTTL); + Assert.assertTrue("Expected 35000 +/- 100 but was " + fourthTTL, fourthTTL >= 34900 && fourthTTL <= 35100); + } + + @Test + public void testLockAcquiredBooleanTimeoutDrift_Descrete() throws Exception { + long leaseTime = 500; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testLockAcquiredTimeoutDrift_Descrete", + 100); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + + // take the lock successfully + Boolean locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertTrue(locked); + + // fail to get the lock, but end up in the thread queue w/ ttl + 100ms timeout + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 200ms timeout + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // unlock the original lock holder + Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Assert.assertTrue(unlocked); + + // get the lock + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertTrue(locked); + + // fail to get the lock, keeping ttl of lock ttl + 200ms + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // fail to get the lock, keeping ttl of lock ttl + 100ms + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // fail to get the lock, keeping ttl of lock ttl + 200ms + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + Thread.sleep(490); + + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertTrue(locked); + } + + @Test + public void testLockAcquiredTimeoutDrift_Descrete() throws Exception { + long leaseTime = 300_000; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testLockAcquiredTimeoutDrift_Descrete"); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + + // take the lock successfully + Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout + Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout + Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTL); + + // unlock the original lock holder + Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Assert.assertNotNull(unlocked); + Assert.assertTrue(unlocked); + + ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + + Long secondTTLAgain = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTLAgain); + long diff = secondTTL - secondTTLAgain; + Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100); + diff = thirdTTL - secondTTLAgain; + Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100); + + thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + diff = thirdTTL - secondTTLAgain; + Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100); + } + + @Test + public void testAbandonedTimeoutDrift_Descrete() throws Exception { + long leaseTime = 500; + long threadWaitTime = 100; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testAbandonedTimeoutDrift_Descrete", + threadWaitTime); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + + // take the lock successfully + Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout + Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout + Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTL); + + Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + + long diff = thirdTTL - firstTTL; + Assert.assertTrue("Expected 200 +/- 10 but was " + diff, diff > 190 && diff < 210); + + Thread.sleep(thirdTTL + threadWaitTime); + + ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + } + + @Test + public void testFirstThreadDeathTimeoutDrift() throws Exception { + int leaseTimeSeconds = 30; + RLock lock = redisson.getFairLock("test-fair-lock"); + AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false); + + + //create a scenario where the same 3 threads keep on trying to get a lock + //to exacerbate the problem, use a very short wait time and a long lease time + //this will end up setting the queue timeout score to a value far away in the future + ExecutorService executor = Executors.newFixedThreadPool(3); + for (int i = 0; i < 10; i++) { + final int finalI = i; + executor.submit(() -> { + log.info("running " + finalI + " in thread " + Thread.currentThread().getId()); + try { + if (lock.tryLock(3000, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by thread " + Thread.currentThread().getId()); + Thread.sleep(100); + try { + //this could fail before use sleep for the same value as the lock expiry, that's fine + //for the purpose of this test + lock.unlock(); + log.info("Lock released by thread " + Thread.currentThread().getId()); + } catch (Exception ignored) { + } + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //for the first 3 threads, add a 50ms delay. This is to recreate the worst case scenario, where all threads + //attempting to lock do so in a staggered pattern. This delay will be carried over by the thread pool. + if (i < 3) { + Thread.sleep(50); + } + } + //we now launch one more thread and kill it before it manages to fail and clean up + //that thread will end up with a timeout that will prevent any others from taking the lock for a long time + executor.submit(() -> { + log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId()); + try { + lastThreadTryingToLock.set(true); + if (lock.tryLock(30000, 30000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by final thread " + Thread.currentThread().getId()); + Thread.sleep(1000); + lock.unlock(); + log.info("Lock released by final thread " + Thread.currentThread().getId()); + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //now we wait for all others threads to stop trying, and only the last thread is running + while (!lastThreadTryingToLock.get()) { + Thread.sleep(100); + } + //try to kill that last thread, and don't let it clean up after itself + executor.shutdownNow(); + //force the lock to unlock just in case + try { + lock.forceUnlock(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + if (lock.isLocked()) { + Assert.fail("Lock should have been unlocked by now"); + } + //check the timeout scores - they should all be within a reasonable amount of time from now + List queue = redisson.getScript(LongCodec.INSTANCE).eval(RScript.Mode.READ_ONLY, + "local result = {}; " + + "local timeouts = redis.call('zrange', KEYS[1], 0, 99, 'WITHSCORES'); " + + "for i=1,#timeouts,2 do " + + "table.insert(result, timeouts[i+1]); " + + "end; " + + "return result; ", + RScript.ReturnType.MULTI, + Collections.singletonList("redisson_lock_timeout:{test-fair-lock}")); + + for (int i = 0; i < queue.size(); i++) { + long timeout = queue.get(i); + long epiry = ((timeout - new Date().getTime()) / 1000); + log.info("Item " + i + " expires in " + epiry + " seconds"); + // the Redisson library uses this 5000ms delay in the code + Assert.assertFalse("It would take more than " + (leaseTimeSeconds + 5 * (i + 1)) + "s to get the lock!", epiry > leaseTimeSeconds + 5 * (i + 1)); + } + } + @Test public void testTryLockNonDelayed() throws InterruptedException { String LOCK_NAME = "SOME_LOCK"; - + Thread t1 = new Thread(() -> { RLock fairLock = redisson.getFairLock(LOCK_NAME); try { @@ -140,7 +567,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { fairLock.unlock(); } }); - + Thread t2 = new Thread(() -> { try { Thread.sleep(200L); @@ -158,13 +585,13 @@ public class RedissonFairLockTest extends BaseConcurrentTest { fairLock.unlock(); } }); - + t1.start(); t2.start(); - + t1.join(); t2.join(); - + RLock fairLock = redisson.getFairLock(LOCK_NAME); try { if (!fairLock.tryLock(0, TimeUnit.SECONDS)) { @@ -174,7 +601,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { fairLock.unlock(); } } - + @Test public void testTryLockWait() throws InterruptedException { testSingleInstanceConcurrency(1, r -> { @@ -183,12 +610,12 @@ public class RedissonFairLockTest extends BaseConcurrentTest { }); RLock lock = redisson.getFairLock("lock"); - + long startTime = System.currentTimeMillis(); lock.tryLock(3, TimeUnit.SECONDS); assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3100L); } - + @Test public void testForceUnlock() { RLock lock = redisson.getFairLock("lock"); @@ -235,7 +662,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); RLock lock = redisson.getFairLock("lock"); - + await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); } @@ -443,7 +870,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Assert.assertEquals(iterations, lockedCounter.get()); } - + @Test public void testConcurrency_MultiInstance_Ordering() throws InterruptedException { final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); @@ -468,7 +895,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Thread.sleep(10); t1.start(); } - + await().atMost(30, TimeUnit.SECONDS).until(() -> lockedCounter.get() == totalThreads); }