From ccda1ee99e2c3d3a048947aba32a11d22236f75a Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Mar 2017 13:10:31 +0300 Subject: [PATCH] ReadWriteLock LeaseTimeout calculation fixed --- .../java/org/redisson/RedissonReadLock.java | 105 ++++++++---------- .../redisson/RedissonReadWriteLockTest.java | 63 +++++++++++ 2 files changed, 112 insertions(+), 56 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index b76174056..ebe650c3a 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -65,15 +65,16 @@ public class RedissonReadLock extends RedissonLock implements RLock { "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('set', KEYS[1] .. ':timeout:1', 1); " + - "redis.call('pexpire', KEYS[1] .. ':timeout:1', ARGV[1]); " + + "redis.call('set', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', 1); " + + "redis.call('pexpire', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + - "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "redis.call('set', KEYS[1] .. ':timeout:' .. ind, 1); " + - "redis.call('pexpire', KEYS[1] .. ':timeout:' .. ind, ARGV[1]); " + + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + + "local key = KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. ind;" + + "redis.call('set', key, 1); " + + "redis.call('pexpire', key, ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + @@ -82,55 +83,48 @@ public class RedissonReadLock extends RedissonLock implements RLock { } @Override - public void unlock() { - Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local mode = redis.call('hget', KEYS[1], 'mode'); " + - "if (mode == false) then " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "return 1; " + - "end; " + - "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + - "if (lockExists == 0) then " + - "return nil;" + - "else " + - "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + - "redis.call('del', KEYS[1] .. ':timeout:' .. (counter+1)); " + - "if (counter > 0) then " + - "local maxRemainTime = -3; " + - "for i=counter, 1, -1 do " + - "local remainTime = redis.call('pttl', KEYS[1] .. ':timeout:' .. i); " + - "maxRemainTime = math.max(remainTime, maxRemainTime);" + - "end; " + - "if maxRemainTime > 0 then " + - "redis.call('pexpire', KEYS[1], maxRemainTime); " + - "else " + - "redis.call('hdel', KEYS[1], ARGV[2]); " + - "if (redis.call('hlen', KEYS[1]) == 1) then " + - "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "end; " + - "end;" + - "return 0; " + - "else " + - "redis.call('hdel', KEYS[1], ARGV[2]); " + - "if (redis.call('hlen', KEYS[1]) == 1) then " + - "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + - "end; " + - "return 1; "+ - "end; " + - "end; " + - "return nil; ", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(Thread.currentThread().getId())); - if (opStatus == null) { - throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " - + id + " thread-id: " + Thread.currentThread().getId()); - } - if (opStatus) { - cancelExpirationRenewal(); - } + protected RFuture unlockInnerAsync(long threadId) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local mode = redis.call('hget', KEYS[1], 'mode'); " + + "if (mode == false) then " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1; " + + "end; " + + "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + + "if (lockExists == 0) then " + + "return nil;" + + "end; " + + + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + + "if (counter == 0) then " + + "redis.call('hdel', KEYS[1], ARGV[2]); " + + "end;" + + "redis.call('del', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. (counter+1)); " + + "if (redis.call('hlen', KEYS[1]) > 1) then " + + "local maxRemainTime = -3; " + + "local keys = redis.call('hkeys', KEYS[1]); " + + "for n, key in ipairs(keys) do " + + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + + "if type(counter) == 'number' then " + + "for i=counter, 1, -1 do " + + "local remainTime = redis.call('pttl', KEYS[1] .. ':' .. key .. ':rwlock_timeout:' .. i); " + + "maxRemainTime = math.max(remainTime, maxRemainTime);" + + "end; " + + "end; " + + "end; " + + + "if maxRemainTime > 0 then " + + "redis.call('pexpire', KEYS[1], maxRemainTime); " + + "return 0; " + + "end;" + + "end; " + + + "redis.call('del', KEYS[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1; ", + Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(threadId)); } - + @Override public Condition newCondition() { throw new UnsupportedOperationException(); @@ -143,9 +137,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + - "else " + - "return 0; " + - "end;", + "end; " + + "return 0; ", Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage); result.addListener(new FutureListener() { diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index 6965cf32c..d6d903a2d 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -1,5 +1,6 @@ package org.redisson; +import static com.jayway.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.security.SecureRandom; @@ -16,6 +17,68 @@ import org.redisson.api.RReadWriteLock; public class RedissonReadWriteLockTest extends BaseConcurrentTest { + @Test + public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException { + RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock(); + Assert.assertTrue(writeLock.tryLock(1, 10, TimeUnit.SECONDS)); + + final AtomicInteger executed = new AtomicInteger(); + Thread t1 = new Thread(() -> { + RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock(); + readLock.lock(); + executed.incrementAndGet(); + }); + + Thread t2 = new Thread(() -> { + RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock(); + readLock.lock(); + executed.incrementAndGet(); + }); + + t1.start(); + t2.start(); + + await().atMost(11, TimeUnit.SECONDS).until(() -> executed.get() == 2); + } + + @Test + public void testReadLockLeaseTimeoutDiffThreadsRRW() throws InterruptedException { + new Thread(() -> { + RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock(); + try { + Assert.assertTrue(readLock.tryLock(1, 10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + + Thread.sleep(5000); + + new Thread(() -> { + RLock readLock2 = redisson.getReadWriteLock("my_read_write_lock").readLock(); + try { + Assert.assertTrue(readLock2.tryLock(1, 10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + readLock2.unlock(); + }).start(); + + final AtomicBoolean executed = new AtomicBoolean(); + new Thread(() -> { + RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock(); + try { + boolean locked = writeLock.tryLock(10, 10, TimeUnit.SECONDS); + executed.set(locked); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + + await().atMost(6, TimeUnit.SECONDS).untilTrue(executed); + } + + @Test public void testReadLockLeaseTimeout() throws InterruptedException { RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();