readLock couldn't be acquired by same thread which has already acquired writeLock #758

pull/762/head
Nikita 8 years ago
parent 6f7f43716a
commit 0ffc2ca1e7

@ -52,6 +52,10 @@ public class RedissonReadLock extends RedissonLock implements RLock {
return "redisson_rwlock__{" + getName() + "}";
}
String getWriteLockName(long threadId) {
return super.getLockName(threadId) + ":write";
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
@ -64,13 +68,13 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') then " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId));
Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
@Override
@ -80,8 +84,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; "
+ "if (mode == 'read') then " +
"end; " +
// "if (mode == 'read') then " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
"if (lockExists == 0) then " +
"return nil;" +
@ -99,7 +103,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"return 1; "+
"end; " +
"end; " +
"end; " +
// "end; " +
"return nil; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
if (opStatus == null) {
@ -146,18 +150,4 @@ public class RedissonReadLock extends RedissonLock implements RLock {
return "read".equals(res);
}
@Override
public boolean isHeldByCurrentThread() {
return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
}
@Override
public int getHoldCount() {
Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
if (res == null) {
return 0;
}
return res.intValue();
}
}

@ -52,6 +52,11 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
return "redisson_rwlock__{" + getName() + "}";
}
@Override
String getLockName(long threadId) {
return super.getLockName(threadId) + ":write";
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
@ -97,6 +102,9 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
@ -148,18 +156,4 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
return "write".equals(res);
}
@Override
public boolean isHeldByCurrentThread() {
return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
}
@Override
public int getHoldCount() {
Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
if (res == null) {
return 0;
}
return res.intValue();
}
}

@ -1,9 +1,11 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.security.SecureRandom;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -12,10 +14,34 @@ import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import static org.assertj.core.api.Assertions.*;
public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testWriteReadReentrancy() throws InterruptedException {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("TEST");
readWriteLock.writeLock().lock();
java.util.concurrent.locks.Lock rLock = readWriteLock.readLock();
Assert.assertTrue(rLock.tryLock());
AtomicBoolean ref = new AtomicBoolean();
Thread t1 = new Thread(() -> {
boolean success = readWriteLock.readLock().tryLock();
ref.set(success);
});
t1.start();
t1.join();
Assert.assertFalse(ref.get());
readWriteLock.writeLock().unlock();
Assert.assertFalse(readWriteLock.writeLock().tryLock());
rLock.unlock();
Assert.assertTrue(readWriteLock.writeLock().tryLock());
readWriteLock.writeLock().unlock();
}
@Test
public void testWriteLock() throws InterruptedException {
final RReadWriteLock lock = redisson.getReadWriteLock("lock");
@ -48,7 +74,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
t.join(50);
writeLock.unlock();
Assert.assertFalse(lock.readLock().tryLock());
Assert.assertTrue(lock.readLock().tryLock());
Assert.assertTrue(writeLock.isHeldByCurrentThread());
writeLock.unlock();
Thread.sleep(1000);

Loading…
Cancel
Save