Fixed - RReadWriteLock renewal doesn't work if writeLock released before readLock then both were acquired. #4217

pull/4272/head
Nikita Koksharov 3 years ago
parent 1d796c14ad
commit 86c49d5a89

@ -137,7 +137,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
@ -175,7 +175,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +

@ -16,6 +16,7 @@
package org.redisson;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -36,7 +37,7 @@ import org.redisson.pubsub.LockPubSub;
*/
public class RedissonReadLock extends RedissonLock implements RLock {
public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) {
protected RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@ -136,7 +137,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
}
@Override
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

@ -16,6 +16,8 @@
package org.redisson;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -113,6 +115,18 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
throw new UnsupportedOperationException();
}
@Override
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
CompletionStage<Boolean> f = super.renewExpirationAsync(threadId);
return f.thenCompose(r -> {
if (!r) {
RedissonReadLock lock = new RedissonReadLock(commandExecutor, getRawName());
return lock.renewExpirationAsync(threadId);
}
return CompletableFuture.completedFuture(r);
});
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);

@ -20,6 +20,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
@ -70,10 +72,10 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
thread2.join(300);
thread3.start();
thread3.join(300);
Awaitility.await().between(8, TimeUnit.SECONDS, 10, TimeUnit.SECONDS).untilTrue(flag);
}
@Test
public void testReadLockExpirationRenewal() throws InterruptedException {
int threadCount = 50;
@ -99,12 +101,12 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
}
});
}
executorService.shutdown();
assertThat(executorService.awaitTermination(180, TimeUnit.SECONDS)).isTrue();
assertThat(exceptions.get()).isZero();
}
@Test
public void testName() throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService service = Executors.newFixedThreadPool(10);
@ -134,11 +136,11 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
assertThat(service.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
}
@Test
public void testWriteLockExpiration() throws InterruptedException {
RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3");
RLock l1 = rw1.writeLock();
assertThat(l1.tryLock(10000, 10000, TimeUnit.MILLISECONDS)).isTrue();
RLock l2 = rw1.writeLock();
@ -154,7 +156,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
}
});
}
@Test
public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
@ -166,22 +168,22 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
.addNode(master2)
.addNode(master3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RReadWriteLock s = redisson.getReadWriteLock("1234");
s.writeLock().lock();
s.readLock().lock();
s.readLock().unlock();
s.writeLock().unlock();
redisson.shutdown();
process.shutdown();
}
@Test
public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException {
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();
@ -193,7 +195,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
readLock.lock();
executed.incrementAndGet();
});
Thread t2 = new Thread(() -> {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();
readLock.lock();
@ -202,10 +204,10 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
t1.start();
t2.start();
await().atMost(11, TimeUnit.SECONDS).until(() -> executed.get() == 2);
}
@Test
public void testReadLockLeaseTimeoutDiffThreadsRRW() throws InterruptedException {
new Thread(() -> {
@ -218,7 +220,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
}).start();
Thread.sleep(5000);
new Thread(() -> {
RLock readLock2 = redisson.getReadWriteLock("my_read_write_lock").readLock();
try {
@ -239,11 +241,11 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
e.printStackTrace();
}
}).start();
await().atMost(6, TimeUnit.SECONDS).untilTrue(executed);
}
@Test
public void testReadLockLeaseTimeout() throws InterruptedException {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();
@ -259,17 +261,17 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();
Assertions.assertTrue(writeLock.tryLock());
}
@Test
public void testWR() throws InterruptedException {
RReadWriteLock rw = redisson.getReadWriteLock("my_read_write_lock");
RLock writeLock = rw.writeLock();
writeLock.lock();
rw.readLock().lock();
assertThat(writeLock.isLocked()).isTrue();
rw.readLock().unlock();
assertThat(writeLock.isLocked()).isTrue();
writeLock.unlock();
assertThat(writeLock.isLocked()).isFalse();
@ -295,12 +297,12 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
t1.start();
t1.join(100);
}
readWriteLock.writeLock().unlock();
assertThat(ref.await(1, TimeUnit.SECONDS)).isTrue();
}
@Test
public void testWriteReadReentrancy() throws InterruptedException {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("TEST");
@ -308,7 +310,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
java.util.concurrent.locks.Lock rLock = readWriteLock.readLock();
Assertions.assertTrue(rLock.tryLock());
AtomicBoolean ref = new AtomicBoolean();
Thread t1 = new Thread(() -> {
boolean success = readWriteLock.readLock().tryLock();
@ -316,9 +318,9 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
});
t1.start();
t1.join();
Assertions.assertFalse(ref.get());
readWriteLock.writeLock().unlock();
Assertions.assertFalse(readWriteLock.writeLock().tryLock());
rLock.unlock();
@ -326,7 +328,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
Assertions.assertTrue(readWriteLock.writeLock().tryLock());
readWriteLock.writeLock().unlock();
}
@Test
public void testWriteLock() throws InterruptedException {
final RReadWriteLock lock = redisson.getReadWriteLock("lock");
@ -445,6 +447,27 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
assertThat(lock.writeLock().isLocked()).isFalse();
}
@Test
public void testReadWriteTTL() throws InterruptedException {
RReadWriteLock rwlock = redisson.getReadWriteLock("rwlock");
rwlock.writeLock().lock();
rwlock.readLock().lock();
for (int i = 0; i < 5; i++) {
assertThat(rwlock.readLock().remainTimeToLive()).isGreaterThan(19000);
TimeUnit.SECONDS.sleep(5);
}
rwlock.writeLock().unlock();
for (int i = 0; i < 5; i++) {
assertThat(rwlock.readLock().remainTimeToLive()).isGreaterThan(19000);
TimeUnit.SECONDS.sleep(5);
}
rwlock.readLock().unlock();
}
@Test
public void testExpireRead() throws InterruptedException {
RReadWriteLock lock = redisson.getReadWriteLock("lock");

Loading…
Cancel
Save