ReadWriteLock LeaseTimeout calculation fixed

pull/812/head
Nikita 8 years ago
parent 22cd446d85
commit ccda1ee99e

@ -65,15 +65,16 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[1] .. ':timeout:1', 1); " +
"redis.call('pexpire', KEYS[1] .. ':timeout:1', ARGV[1]); " +
"redis.call('set', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', 1); " +
"redis.call('pexpire', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[1] .. ':timeout:' .. ind, 1); " +
"redis.call('pexpire', KEYS[1] .. ':timeout:' .. ind, ARGV[1]); " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
@ -82,55 +83,48 @@ public class RedissonReadLock extends RedissonLock implements RLock {
}
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"redis.call('del', KEYS[1] .. ':timeout:' .. (counter+1)); " +
"if (counter > 0) then " +
"local maxRemainTime = -3; " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[1] .. ':timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"else " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " +
"end;" +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"return nil; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[1] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"end; " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(threadId));
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
@ -143,9 +137,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"else " +
"return 0; " +
"end;",
"end; " +
"return 0; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage);
result.addListener(new FutureListener<Boolean>() {

@ -1,5 +1,6 @@
package org.redisson;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.security.SecureRandom;
@ -16,6 +17,68 @@ import org.redisson.api.RReadWriteLock;
public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException {
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();
Assert.assertTrue(writeLock.tryLock(1, 10, TimeUnit.SECONDS));
final AtomicInteger executed = new AtomicInteger();
Thread t1 = new Thread(() -> {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();
readLock.lock();
executed.incrementAndGet();
});
Thread t2 = new Thread(() -> {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();
readLock.lock();
executed.incrementAndGet();
});
t1.start();
t2.start();
await().atMost(11, TimeUnit.SECONDS).until(() -> executed.get() == 2);
}
@Test
public void testReadLockLeaseTimeoutDiffThreadsRRW() throws InterruptedException {
new Thread(() -> {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();
try {
Assert.assertTrue(readLock.tryLock(1, 10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(5000);
new Thread(() -> {
RLock readLock2 = redisson.getReadWriteLock("my_read_write_lock").readLock();
try {
Assert.assertTrue(readLock2.tryLock(1, 10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
readLock2.unlock();
}).start();
final AtomicBoolean executed = new AtomicBoolean();
new Thread(() -> {
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();
try {
boolean locked = writeLock.tryLock(10, 10, TimeUnit.SECONDS);
executed.set(locked);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
await().atMost(6, TimeUnit.SECONDS).untilTrue(executed);
}
@Test
public void testReadLockLeaseTimeout() throws InterruptedException {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();

Loading…
Cancel
Save