From 6dbc972b6c53958512cf4d9b3b763553a6ccbd97 Mon Sep 17 00:00:00 2001 From: Justin Corpron Date: Thu, 9 May 2019 13:13:22 -0700 Subject: [PATCH 1/2] Fix timeout drift in RedissonFairLock This change alters how timeouts are calculated for threads added into the queue when the lock cannot be immediately acquired, and adds logic to reduce the timeouts when a thread leaves the queue by either acquiring the lock or timing- out while waiting. Tests have been added to ensure that the added and altered lua code is necessary to provide the documented behavior of the fair lock, and that the changes do not break existing desired behavior. The timeout drift issue is resolved by decreasing the timeouts in the redisson_lock_timeout sorted set when a thread is removed from the queue. This logic was added to the tryLockInnerAsync lua code (both variations) in the branch where the lock is successfully acquired, and in this case all timeouts except the timeout being removed from the queue are decreased by threadWaitTime. Additionally, the existing lua code in acquireFailedAsync was changed to always decrease the timeouts regardless of where the removed thread is in the queue, however this requires that the queue be traversed to determine the position of the thread being removed is in the queue so that only those threads after it have their timeouts decreased. The existing code also had the behavior where if the 1st and 2nd threads in the queue were removed via acquireFailedAsync, the TTL for the 3rd thread would equal the lock TTL and it would not be able to acquire the lock fairly if the lock expired. This change requires the change to the timeout calculation in order to fix both the timeout drift and the unfair timeout decrease problems of the existing code. The existing timeout calculation at the end of the lua code for tryLockInnerAsync in the tryLock w/ waitTime call path used to be either the lock timeout value + 5s for the first thread in the queue or for the other threads in the queue the value was essentially the first thread's timeout + 5s. This second rule for the 2nd thread is correct per the documentation, but for the 3rd to the Nth thread, the timeout would not allow these threads to acquire the lock fairly within 5s after the prior thread if the 1st and 2nd thread died since their timeouts are the same as the 2nd thead, and this is contrary to the documentation which provides 5s per thread in the queue. The new code sets the timeout for a thread added to the queue to 5s plus the timeout of the thread at the end of the queue; there is always a check now to see if the thread that has failed to acquire the lock in the lua script is already in the queue, and if it is already in the queue, then the lua code returns the approximate ttl based on that thread's current timeout (timeout - 5s). Note that the "remove stale threads" while loop was not altered even though it also removes threads from the queue. This is the expected behavior and was preserved, and some added tests now check the timeout expiration behavior. --- .../java/org/redisson/RedissonFairLock.java | 367 ++++++++------ .../org/redisson/RedissonFairLockTest.java | 463 +++++++++++++++++- 2 files changed, 662 insertions(+), 168 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index d8021039b..fdeeb2dbe 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -40,18 +40,23 @@ import org.redisson.pubsub.LockPubSub; */ public class RedissonFairLock extends RedissonLock implements RLock { - private final long threadWaitTime = 5000; + private final long threadWaitTime; private final CommandAsyncExecutor commandExecutor; private final String threadsQueueName; private final String timeoutSetName; public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { + this(commandExecutor, name, 5000); + } + + public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) { super(commandExecutor, name); this.commandExecutor = commandExecutor; + this.threadWaitTime = threadWaitTime; threadsQueueName = prefixName("redisson_lock_queue", name); timeoutSetName = prefixName("redisson_lock_timeout", name); } - + @Override protected RedissonLockEntry getEntry(long threadId) { return pubSub.getEntry(getEntryName() + ":" + threadId); @@ -59,32 +64,40 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override protected RFuture subscribe(long threadId) { - return pubSub.subscribe(getEntryName() + ":" + threadId, + return pubSub.subscribe(getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId)); } @Override protected void unsubscribe(RFuture future, long threadId) { - pubSub.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, + pubSub.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId)); } @Override protected RFuture acquireFailedAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, - "local firstThreadId = redis.call('lindex', KEYS[1], 0); " + - "if firstThreadId == ARGV[1] then " + - "local keys = redis.call('zrange', KEYS[2], 0, -1); " + - "for i = 1, #keys, 1 do " + - "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" + - "end;" + - "end;" + - "redis.call('zrem', KEYS[2], ARGV[1]); " + - "redis.call('lrem', KEYS[1], 0, ARGV[1]); ", - Arrays.asList(threadsQueueName, timeoutSetName), - getLockName(threadId), threadWaitTime); + // get the existing timeout for the thread to remove + "local queue = redis.call('lrange', KEYS[1], 0, -1);" + + // find the location in the queue where the thread is + "local i = 1;" + + "while i <= #queue and queue[i] ~= ARGV[1] do " + + "i = i + 1;" + + "end;" + + // go to the next index which will exist after the current thread is removed + "i = i + 1;" + + // decrement the timeout for the rest of the queue after the thread being removed + "while i <= #queue do " + + "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), queue[i]);" + + "i = i + 1;" + + "end;" + + // remove the thread from the queue and timeouts set + "redis.call('zrem', KEYS[2], ARGV[1]);" + + "redis.call('lrem', KEYS[1], 0, ARGV[1]);", + Arrays.asList(threadsQueueName, timeoutSetName), + getLockName(threadId), threadWaitTime); } - + @Override RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); @@ -93,134 +106,187 @@ public class RedissonFairLock extends RedissonLock implements RLock { if (command == RedisCommands.EVAL_NULL_BOOLEAN) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads - "while true do " - + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" - + "if firstThreadId2 == false then " - + "break;" - + "end; " - + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" - + "if timeout <= tonumber(ARGV[3]) then " - + "redis.call('zrem', KEYS[3], firstThreadId2); " - + "redis.call('lpop', KEYS[2]); " - + "else " - + "break;" - + "end; " - + "end;" - + - - "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " - + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + - "redis.call('lpop', KEYS[2]); " + - "redis.call('zrem', KEYS[3], ARGV[2]); " + - "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - "return 1;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName), - internalLockLeaseTime, getLockName(threadId), currentTime); + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end;" + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[3]) then " + + // remove the item from the queue and timeout set + // NOTE we do not alter any other timeout + "redis.call('zrem', KEYS[3], firstThreadId2);" + + "redis.call('lpop', KEYS[2]);" + + "else " + + "break;" + + "end;" + + "end;" + + + "if (redis.call('exists', KEYS[1]) == 0) " + + "and ((redis.call('exists', KEYS[2]) == 0) " + + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + + "redis.call('lpop', KEYS[2]);" + + "redis.call('zrem', KEYS[3], ARGV[2]);" + + + // decrease timeouts for all waiting in the queue + "local keys = redis.call('zrange', KEYS[3], 0, -1);" + + "for i = 1, #keys, 1 do " + + "redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" + + "end;" + + + "redis.call('hset', KEYS[1], ARGV[2], 1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + + "redis.call('hincrby', KEYS[1], ARGV[2], 1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + "return 1;", + Arrays.asList(getName(), threadsQueueName, timeoutSetName), + internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime); } - + if (command == RedisCommands.EVAL_LONG) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads - "while true do " - + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" - + "if firstThreadId2 == false then " - + "break;" - + "end; " - + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" - + "if timeout <= tonumber(ARGV[4]) then " - + "redis.call('zrem', KEYS[3], firstThreadId2); " - + "redis.call('lpop', KEYS[2]); " - + "else " - + "break;" - + "end; " - + "end;" - - + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " - + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + - "redis.call('lpop', KEYS[2]); " + - "redis.call('zrem', KEYS[3], ARGV[2]); " + - "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return nil; " + - "end; " + - - "local firstThreadId = redis.call('lindex', KEYS[2], 0); " + - "local ttl; " + - "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + - "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + - "else " - + "ttl = redis.call('pttl', KEYS[1]);" + - "end; " + - - "local timeout = ttl + tonumber(ARGV[3]);" + - "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + - "redis.call('rpush', KEYS[2], ARGV[2]);" + - "end; " + - "return ttl;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName), - internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime); + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end;" + + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[4]) then " + + // remove the item from the queue and timeout set + // NOTE we do not alter any other timeout + "redis.call('zrem', KEYS[3], firstThreadId2);" + + "redis.call('lpop', KEYS[2]);" + + "else " + + "break;" + + "end;" + + "end;" + + + // check if the lock can be acquired now + "if (redis.call('exists', KEYS[1]) == 0) " + + "and ((redis.call('exists', KEYS[2]) == 0) " + + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + + + // remove this thread from the queue and timeout set + "redis.call('lpop', KEYS[2]);" + + "redis.call('zrem', KEYS[3], ARGV[2]);" + + + // decrease timeouts for all waiting in the queue + "local keys = redis.call('zrange', KEYS[3], 0, -1);" + + "for i = 1, #keys, 1 do " + + "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" + + "end;" + + + // acquire the lock and set the TTL for the lease + "redis.call('hset', KEYS[1], ARGV[2], 1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + + // check if the lock is already held, and this is a re-entry + "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + + "redis.call('hincrby', KEYS[1], ARGV[2],1);" + + "redis.call('pexpire', KEYS[1], ARGV[1]);" + + "return nil;" + + "end;" + + + // the lock cannot be acquired + // check if the thread is already in the queue + "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" + + "if timeout ~= false then " + + // the real timeout is the timeout of the prior thread + // in the queue, but this is approximately correct, and + // avoids having to traverse the queue + "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" + + "end;" + + + // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of + // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the + // threadWaitTime + "local lastThreadId = redis.call('lindex', KEYS[2], -1);" + + "local ttl;" + + "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + + "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + + "else " + + "ttl = redis.call('pttl', KEYS[1]);" + + "end;" + + "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + + "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + + "redis.call('rpush', KEYS[2], ARGV[2]);" + + "end;" + + "return ttl;", + Arrays.asList(getName(), threadsQueueName, timeoutSetName), + internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime); } - + throw new IllegalArgumentException(); } - + @Override protected RFuture unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads - "while true do " - + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" - + "if firstThreadId2 == false then " - + "break;" - + "end; " - + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" - + "if timeout <= tonumber(ARGV[4]) then " - + "redis.call('zrem', KEYS[3], firstThreadId2); " - + "redis.call('lpop', KEYS[2]); " - + "else " - + "break;" - + "end; " - + "end;" - - + "if (redis.call('exists', KEYS[1]) == 0) then " + - "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end;" + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[4]) then " + + // remove the item from the queue and timeout set + // NOTE we do not alter any other timeout + "redis.call('zrem', KEYS[3], firstThreadId2);" + + "redis.call('lpop', KEYS[2]);" + + "else " + + "break;" + + "end;" + + "end;" + + + // check if the lock already doesn't exist + "if (redis.call('exists', KEYS[1]) == 0) then " + + // get the first thread in the queue + "local nextThreadId = redis.call('lindex', KEYS[2], 0);" + + // notify the first in the queue that the lock is being unlocked "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + - "end; " + - "return 1; " + + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]);" + + "end;" + + // the lock is already unlocked + "return 1;" + "end;" + + + // does the entry count exist for the current thread "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + + // it does not, this thread does not hold the lock "return nil;" + - "end; " + - "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + - "if (counter > 0) then " + - "redis.call('pexpire', KEYS[1], ARGV[2]); " + - "return 0; " + - "end; " + - - "redis.call('del', KEYS[1]); " + - "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + + "end;" + + + // decrement the entry count + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" + + "if counter > 0 then " + + // update the expiration w/ the new lease time + "redis.call('pexpire', KEYS[1], ARGV[2]);" + + // the lock is still held + "return 0;" + + "end;" + + + // release the lock + "redis.call('del', KEYS[1]);" + + // notify the next thread in the queue + "local nextThreadId = redis.call('lindex', KEYS[2], 0);" + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + - "end; " + - "return 1; ", - Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]);" + + "end;" + + "return 1;", + Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); } - + @Override public Condition newCondition() { throw new UnsupportedOperationException(); @@ -230,13 +296,13 @@ public class RedissonFairLock extends RedissonLock implements RLock { public RFuture deleteAsync() { return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), threadsQueueName, timeoutSetName); } - + @Override public RFuture sizeInMemoryAsync() { List keys = Arrays.asList(getName(), threadsQueueName, timeoutSetName); return super.sizeInMemoryAsync(keys); } - + @Override public RFuture expireAsync(long timeToLive, TimeUnit timeUnit) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -266,36 +332,37 @@ public class RedissonFairLock extends RedissonLock implements RLock { Arrays.asList(getName(), threadsQueueName, timeoutSetName)); } - + @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads - "while true do " - + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" - + "if firstThreadId2 == false then " - + "break;" - + "end; " - + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" - + "if timeout <= tonumber(ARGV[2]) then " - + "redis.call('zrem', KEYS[3], firstThreadId2); " - + "redis.call('lpop', KEYS[2]); " - + "else " - + "break;" - + "end; " - + "end;" - + - - "if (redis.call('del', KEYS[1]) == 1) then " + - "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0); " + + "if firstThreadId2 == false then " + + "break; " + + "end; " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[2]) then " + + // remove the item from the queue and timeout set + // NOTE we do not alter any other timeout + "redis.call('zrem', KEYS[3], firstThreadId2); " + + "redis.call('lpop', KEYS[2]); " + + "else " + + "break;" + + "end;" + + "end;" + + + "if (redis.call('del', KEYS[1]) == 1) then " + + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + - "end; " + - "return 1; " + - "end; " + + "end; " + + "return 1; " + + "end; " + "return 0;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), + Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis()); } diff --git a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java index 4e068cab1..5b5254c21 100644 --- a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java @@ -20,15 +20,16 @@ import org.junit.Test; import org.redisson.api.RLock; import org.redisson.api.RScript; import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RedissonFairLockTest extends BaseConcurrentTest { private final Logger log = LoggerFactory.getLogger(RedissonFairLockTest.class); - + @Test - public void testTimeoutDrift() throws Exception { + public void testWaitTimeoutDrift() throws Exception { int leaseTimeSeconds = 30; RLock lock = redisson.getFairLock("test-fair-lock"); AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false); @@ -38,12 +39,14 @@ public class RedissonFairLockTest extends BaseConcurrentTest { //to exacerbate the problem, use a very short wait time and a long lease time //this will end up setting the queue timeout score to a value far away in the future ExecutorService executor = Executors.newFixedThreadPool(3); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { + final int finalI = i; executor.submit(() -> { + log.info("running " + finalI + " in thread " + Thread.currentThread().getId()); try { if (lock.tryLock(500, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) { log.info("Lock taken by thread " + Thread.currentThread().getId()); - Thread.sleep(30000); + Thread.sleep(10000); try { //this could fail before use sleep for the same value as the lock expiry, that's fine //for the purpose of this test @@ -53,7 +56,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { } } } catch (InterruptedException ex) { - log.warn("Interrupted"); + log.warn("Interrupted " + Thread.currentThread().getId()); } catch (Exception ex) { log.error(ex.getMessage(), ex); } @@ -66,6 +69,98 @@ public class RedissonFairLockTest extends BaseConcurrentTest { } //we now launch one more thread and kill it before it manages to fail and clean up //that thread will end up with a timeout that will prevent any others from taking the lock for a long time + executor.submit(() -> { + log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId()); + try { + lastThreadTryingToLock.set(true); + if (lock.tryLock(30000, 30000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by final thread " + Thread.currentThread().getId()); + Thread.sleep(1000); + lock.unlock(); + log.info("Lock released by final thread " + Thread.currentThread().getId()); + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //now we wait for all others threads to stop trying, and only the last thread is running + while (!lastThreadTryingToLock.get()) { + Thread.sleep(100); + } + //try to kill that last thread, and don't let it clean up after itself + executor.shutdownNow(); + //force the lock to unlock just in case + try { + lock.forceUnlock(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + if (lock.isLocked()) { + Assert.fail("Lock should have been unlocked by now"); + } + //check the timeout scores - they should all be within a reasonable amount of time from now + List queue = redisson.getScript(LongCodec.INSTANCE).eval(RScript.Mode.READ_ONLY, + "local result = {}; " + + "local timeouts = redis.call('zrange', KEYS[1], 0, 99, 'WITHSCORES'); " + + "for i=1,#timeouts,2 do " + + "table.insert(result, timeouts[i+1]); " + + "end; " + + "return result; ", + RScript.ReturnType.MULTI, + Collections.singletonList("redisson_lock_timeout:{test-fair-lock}")); + + int i = 0; + for (Long timeout : queue) { + long epiry = ((timeout - new Date().getTime()) / 1000); + log.info("Item " + (i++) + " expires in " + epiry + " seconds"); + //the Redisson library uses this 5000ms delay in the code + if (epiry > leaseTimeSeconds + 5) { + Assert.fail("It would take more than " + leaseTimeSeconds + "s to get the lock!"); + } + } + } + + @Test + public void testLockAcquiredTimeoutDrift() throws Exception { + int leaseTimeSeconds = 30; + RLock lock = redisson.getFairLock("test-fair-lock"); + + //create a scenario where the same 3 threads keep on trying to get a lock + //to exacerbate the problem, use a very short wait time and a long lease time + //this will end up setting the queue timeout score to a value far away in the future + ExecutorService executor = Executors.newFixedThreadPool(3); + for (int i = 0; i < 3; i++) { + final int finalI = i; + executor.submit(() -> { + log.info("running " + finalI + " in thread " + Thread.currentThread().getId()); + try { + if (lock.tryLock(3000, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by thread " + Thread.currentThread().getId()); + Thread.sleep(100); + try { + //this could fail before use sleep for the same value as the lock expiry, that's fine + //for the purpose of this test + lock.unlock(); + log.info("Lock released by thread " + Thread.currentThread().getId()); + } catch (Exception ignored) { + } + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //for the first 3 threads, add a 50ms delay. This is to recreate the worst case scenario, where all threads + //attempting to lock do so in a staggered pattern. This delay will be carried over by the thread pool. + Thread.sleep(50); + } + + AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false); + //we now launch one more thread and kill it before it manages to fail and clean up + //that thread will end up with a timeout that will prevent any others from taking the lock for a long time executor.submit(() -> { log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId()); try { @@ -91,7 +186,8 @@ public class RedissonFairLockTest extends BaseConcurrentTest { //force the lock to unlock just in case try { lock.forceUnlock(); - } catch (Exception ignored) { + } catch (Exception e) { + log.error(e.getMessage(), e); } if (lock.isLocked()) { Assert.fail("Lock should have been unlocked by now"); @@ -117,11 +213,342 @@ public class RedissonFairLockTest extends BaseConcurrentTest { } } } - + + @Test + public void testAcquireFailedTimeoutDrift_Descrete() throws Exception { + long leaseTime = 30_000; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testAcquireFailedTimeoutDrift_Descrete"); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + long threadFourthWaiter = 105; + + // take the lock successfully + Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout + Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout + Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTL); + Assert.assertTrue("Expected 35000 +/- 100 but was " + secondTTL, secondTTL >= 34900 && secondTTL <= 35100); + + // try the third, and check the TTL + Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + Assert.assertTrue("Expected 40000 +/- 100 but was " + thirdTTL, thirdTTL >= 39900 && thirdTTL <= 40100); + + // try the fourth, and check the TTL + Long fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(fourthTTL); + Assert.assertTrue("Expected 45000 +/- 100 but was " + fourthTTL, fourthTTL >= 44900 && fourthTTL <= 45100); + + // wait timeout the second waiter + lock.acquireFailedAsync(threadSecondWaiter).await().get(); + + // try the first, and check the TTL + firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + Assert.assertTrue("Expected 30000 +/- 100 but was " + firstTTL, firstTTL >= 29900 && firstTTL <= 30100); + + // try the third, and check the TTL + thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + Assert.assertTrue("Expected 35000 +/- 300 but was " + thirdTTL, thirdTTL >= 34700 && thirdTTL <= 35300); + + // try the fourth, and check the TTL + fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(fourthTTL); + Assert.assertTrue("Expected 40000 +/- 100 but was " + fourthTTL, fourthTTL >= 39900 && fourthTTL <= 40100); + + // unlock the original lock holder + Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Assert.assertNotNull(unlocked); + Assert.assertTrue(unlocked); + + // acquire the lock immediately with the 1nd + ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // try the third, and check the TTL + thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + Assert.assertTrue("Expected 30000 +/- 300 but was " + thirdTTL, thirdTTL >= 29700 && thirdTTL <= 30300); + + fourthTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(fourthTTL); + Assert.assertTrue("Expected 35000 +/- 100 but was " + fourthTTL, fourthTTL >= 34900 && fourthTTL <= 35100); + } + + @Test + public void testLockAcquiredBooleanTimeoutDrift_Descrete() throws Exception { + long leaseTime = 500; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testLockAcquiredTimeoutDrift_Descrete", + 100); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + + // take the lock successfully + Boolean locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertTrue(locked); + + // fail to get the lock, but end up in the thread queue w/ ttl + 100ms timeout + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 200ms timeout + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // unlock the original lock holder + Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Assert.assertTrue(unlocked); + + // get the lock + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertTrue(locked); + + // fail to get the lock, keeping ttl of lock ttl + 200ms + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // fail to get the lock, keeping ttl of lock ttl + 100ms + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + // fail to get the lock, keeping ttl of lock ttl + 200ms + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertFalse(locked); + + Thread.sleep(490); + + locked = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Assert.assertTrue(locked); + } + + @Test + public void testLockAcquiredTimeoutDrift_Descrete() throws Exception { + long leaseTime = 300_000; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testLockAcquiredTimeoutDrift_Descrete"); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + + // take the lock successfully + Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout + Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout + Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTL); + + // unlock the original lock holder + Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Assert.assertNotNull(unlocked); + Assert.assertTrue(unlocked); + + ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + + Long secondTTLAgain = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTLAgain); + long diff = secondTTL - secondTTLAgain; + Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100); + diff = thirdTTL - secondTTLAgain; + Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100); + + thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + diff = thirdTTL - secondTTLAgain; + Assert.assertTrue("Expected 5000 +/- 100 but was " + diff, diff > 4900 && diff < 5100); + } + + @Test + public void testAbandonedTimeoutDrift_Descrete() throws Exception { + long leaseTime = 500; + long threadWaitTime = 100; + + // we're testing interaction of various internal methods, so create a Redisson instance for protected access + Redisson redisson = new Redisson(createConfig()); + + RedissonFairLock lock = new RedissonFairLock( + redisson.connectionManager.getCommandExecutor(), + "testAbandonedTimeoutDrift_Descrete", + threadWaitTime); + + // clear out any prior state + lock.delete(); + + long threadInit = 101; + long threadFirstWaiter = 102; + long threadSecondWaiter = 103; + long threadThirdWaiter = 104; + + // take the lock successfully + Long ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + + // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout + Long firstTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(firstTTL); + + // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout + Long secondTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(secondTTL); + + Long thirdTTL = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNotNull(thirdTTL); + + long diff = thirdTTL - firstTTL; + Assert.assertTrue("Expected 200 +/- 10 but was " + diff, diff > 190 && diff < 210); + + Thread.sleep(thirdTTL + threadWaitTime); + + ttl = lock.tryLockInnerAsync(leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Assert.assertNull(ttl); + } + + @Test + public void testFirstThreadDeathTimeoutDrift() throws Exception { + int leaseTimeSeconds = 30; + RLock lock = redisson.getFairLock("test-fair-lock"); + AtomicBoolean lastThreadTryingToLock = new AtomicBoolean(false); + + + //create a scenario where the same 3 threads keep on trying to get a lock + //to exacerbate the problem, use a very short wait time and a long lease time + //this will end up setting the queue timeout score to a value far away in the future + ExecutorService executor = Executors.newFixedThreadPool(3); + for (int i = 0; i < 10; i++) { + final int finalI = i; + executor.submit(() -> { + log.info("running " + finalI + " in thread " + Thread.currentThread().getId()); + try { + if (lock.tryLock(3000, leaseTimeSeconds * 1000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by thread " + Thread.currentThread().getId()); + Thread.sleep(100); + try { + //this could fail before use sleep for the same value as the lock expiry, that's fine + //for the purpose of this test + lock.unlock(); + log.info("Lock released by thread " + Thread.currentThread().getId()); + } catch (Exception ignored) { + } + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //for the first 3 threads, add a 50ms delay. This is to recreate the worst case scenario, where all threads + //attempting to lock do so in a staggered pattern. This delay will be carried over by the thread pool. + if (i < 3) { + Thread.sleep(50); + } + } + //we now launch one more thread and kill it before it manages to fail and clean up + //that thread will end up with a timeout that will prevent any others from taking the lock for a long time + executor.submit(() -> { + log.info("Final thread trying to take the lock with thread id: " + Thread.currentThread().getId()); + try { + lastThreadTryingToLock.set(true); + if (lock.tryLock(30000, 30000, TimeUnit.MILLISECONDS)) { + log.info("Lock taken by final thread " + Thread.currentThread().getId()); + Thread.sleep(1000); + lock.unlock(); + log.info("Lock released by final thread " + Thread.currentThread().getId()); + } + } catch (InterruptedException ex) { + log.warn("Interrupted " + Thread.currentThread().getId()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + }); + //now we wait for all others threads to stop trying, and only the last thread is running + while (!lastThreadTryingToLock.get()) { + Thread.sleep(100); + } + //try to kill that last thread, and don't let it clean up after itself + executor.shutdownNow(); + //force the lock to unlock just in case + try { + lock.forceUnlock(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + if (lock.isLocked()) { + Assert.fail("Lock should have been unlocked by now"); + } + //check the timeout scores - they should all be within a reasonable amount of time from now + List queue = redisson.getScript(LongCodec.INSTANCE).eval(RScript.Mode.READ_ONLY, + "local result = {}; " + + "local timeouts = redis.call('zrange', KEYS[1], 0, 99, 'WITHSCORES'); " + + "for i=1,#timeouts,2 do " + + "table.insert(result, timeouts[i+1]); " + + "end; " + + "return result; ", + RScript.ReturnType.MULTI, + Collections.singletonList("redisson_lock_timeout:{test-fair-lock}")); + + for (int i = 0; i < queue.size(); i++) { + long timeout = queue.get(i); + long epiry = ((timeout - new Date().getTime()) / 1000); + log.info("Item " + i + " expires in " + epiry + " seconds"); + // the Redisson library uses this 5000ms delay in the code + Assert.assertFalse("It would take more than " + (leaseTimeSeconds + 5 * (i + 1)) + "s to get the lock!", epiry > leaseTimeSeconds + 5 * (i + 1)); + } + } + @Test public void testTryLockNonDelayed() throws InterruptedException { String LOCK_NAME = "SOME_LOCK"; - + Thread t1 = new Thread(() -> { RLock fairLock = redisson.getFairLock(LOCK_NAME); try { @@ -140,7 +567,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { fairLock.unlock(); } }); - + Thread t2 = new Thread(() -> { try { Thread.sleep(200L); @@ -158,13 +585,13 @@ public class RedissonFairLockTest extends BaseConcurrentTest { fairLock.unlock(); } }); - + t1.start(); t2.start(); - + t1.join(); t2.join(); - + RLock fairLock = redisson.getFairLock(LOCK_NAME); try { if (!fairLock.tryLock(0, TimeUnit.SECONDS)) { @@ -174,7 +601,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { fairLock.unlock(); } } - + @Test public void testTryLockWait() throws InterruptedException { testSingleInstanceConcurrency(1, r -> { @@ -183,12 +610,12 @@ public class RedissonFairLockTest extends BaseConcurrentTest { }); RLock lock = redisson.getFairLock("lock"); - + long startTime = System.currentTimeMillis(); lock.tryLock(3, TimeUnit.SECONDS); assertThat(System.currentTimeMillis() - startTime).isBetween(2990L, 3100L); } - + @Test public void testForceUnlock() { RLock lock = redisson.getFairLock("lock"); @@ -235,7 +662,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); RLock lock = redisson.getFairLock("lock"); - + await().atMost(redisson.getConfig().getLockWatchdogTimeout() + 1000, TimeUnit.MILLISECONDS).until(() -> !lock.isLocked()); } @@ -443,7 +870,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Assert.assertEquals(iterations, lockedCounter.get()); } - + @Test public void testConcurrency_MultiInstance_Ordering() throws InterruptedException { final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); @@ -468,7 +895,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Thread.sleep(10); t1.start(); } - + await().atMost(30, TimeUnit.SECONDS).until(() -> lockedCounter.get() == totalThreads); } From df6cca4bcf5c50b754ee0a47276ff45a90339f7f Mon Sep 17 00:00:00 2001 From: Justin Corpron Date: Mon, 24 Jun 2019 21:05:16 -0700 Subject: [PATCH 2/2] Remove formatting of unchanged logic --- .../java/org/redisson/RedissonFairLock.java | 126 ++++++++---------- 1 file changed, 55 insertions(+), 71 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index fdeeb2dbe..864e56d96 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -232,58 +232,43 @@ public class RedissonFairLock extends RedissonLock implements RLock { protected RFuture unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads - "while true do " + - "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + - "if firstThreadId2 == false then " + - "break;" + - "end;" + - "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + - "if timeout <= tonumber(ARGV[4]) then " + - // remove the item from the queue and timeout set - // NOTE we do not alter any other timeout - "redis.call('zrem', KEYS[3], firstThreadId2);" + - "redis.call('lpop', KEYS[2]);" + - "else " + - "break;" + - "end;" + - "end;" + - - // check if the lock already doesn't exist - "if (redis.call('exists', KEYS[1]) == 0) then " + - // get the first thread in the queue - "local nextThreadId = redis.call('lindex', KEYS[2], 0);" + - // notify the first in the queue that the lock is being unlocked + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end; " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[4]) then " + + "redis.call('zrem', KEYS[3], firstThreadId2); " + + "redis.call('lpop', KEYS[2]); " + + "else " + + "break;" + + "end; " + + "end;" + + + "if (redis.call('exists', KEYS[1]) == 0) then " + + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]);" + - "end;" + - // the lock is already unlocked - "return 1;" + + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + + "end; " + + "return 1; " + "end;" + - - // does the entry count exist for the current thread "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + - // it does not, this thread does not hold the lock "return nil;" + - "end;" + - - // decrement the entry count - "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" + - "if counter > 0 then " + - // update the expiration w/ the new lease time - "redis.call('pexpire', KEYS[1], ARGV[2]);" + - // the lock is still held - "return 0;" + - "end;" + - - // release the lock - "redis.call('del', KEYS[1]);" + - // notify the next thread in the queue - "local nextThreadId = redis.call('lindex', KEYS[2], 0);" + + "end; " + + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + + "if (counter > 0) then " + + "redis.call('pexpire', KEYS[1], ARGV[2]); " + + "return 0; " + + "end; " + + + "redis.call('del', KEYS[1]); " + + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]);" + - "end;" + - "return 1;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + + "end; " + + "return 1; ", + Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); } @@ -332,38 +317,37 @@ public class RedissonFairLock extends RedissonLock implements RLock { Arrays.asList(getName(), threadsQueueName, timeoutSetName)); } - + @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads - "while true do " + - "local firstThreadId2 = redis.call('lindex', KEYS[2], 0); " + - "if firstThreadId2 == false then " + - "break; " + - "end; " + - "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + - "if timeout <= tonumber(ARGV[2]) then " + - // remove the item from the queue and timeout set - // NOTE we do not alter any other timeout - "redis.call('zrem', KEYS[3], firstThreadId2); " + - "redis.call('lpop', KEYS[2]); " + - "else " + - "break;" + - "end;" + - "end;" + - - "if (redis.call('del', KEYS[1]) == 1) then " + - "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + + "while true do " + + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + + "if firstThreadId2 == false then " + + "break;" + + "end; " + + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + + "if timeout <= tonumber(ARGV[2]) then " + + "redis.call('zrem', KEYS[3], firstThreadId2); " + + "redis.call('lpop', KEYS[2]); " + + "else " + + "break;" + + "end; " + + "end;" + + + + "if (redis.call('del', KEYS[1]) == 1) then " + + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + - "end; " + - "return 1; " + - "end; " + + "end; " + + "return 1; " + + "end; " + "return 0;", - Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), + Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis()); } -} +} \ No newline at end of file