From 86c49d5a89321f0d899f7611d407b8e20a6993f9 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 21 Apr 2022 08:06:21 +0300 Subject: [PATCH] Fixed - RReadWriteLock renewal doesn't work if writeLock released before readLock then both were acquired. #4217 --- .../java/org/redisson/RedissonBaseLock.java | 4 +- .../java/org/redisson/RedissonReadLock.java | 5 +- .../java/org/redisson/RedissonWriteLock.java | 14 ++++ .../redisson/RedissonReadWriteLockTest.java | 79 ++++++++++++------- 4 files changed, 70 insertions(+), 32 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index 6acbed526..0f1f53687 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -137,7 +137,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc return; } - RFuture future = renewExpirationAsync(threadId); + CompletionStage future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); @@ -175,7 +175,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc } } - protected RFuture renewExpirationAsync(long threadId) { + protected CompletionStage renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index aa710fc29..8212774a7 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -16,6 +16,7 @@ package org.redisson; import java.util.Arrays; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -36,7 +37,7 @@ import org.redisson.pubsub.LockPubSub; */ public class RedissonReadLock extends RedissonLock implements RLock { - public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) { + protected RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } @@ -136,7 +137,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { } @Override - protected RFuture renewExpirationAsync(long threadId) { + protected CompletionStage renewExpirationAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index 85268a9ed..a33b4a117 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -16,6 +16,8 @@ package org.redisson; import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -113,6 +115,18 @@ public class RedissonWriteLock extends RedissonLock implements RLock { throw new UnsupportedOperationException(); } + @Override + protected CompletionStage renewExpirationAsync(long threadId) { + CompletionStage f = super.renewExpirationAsync(threadId); + return f.thenCompose(r -> { + if (!r) { + RedissonReadLock lock = new RedissonReadLock(commandExecutor, getRawName()); + return lock.renewExpirationAsync(threadId); + } + return CompletableFuture.completedFuture(r); + }); + } + @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null); diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index e2d1cb59f..cf4d73e96 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -20,6 +20,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; @@ -70,10 +72,10 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { thread2.join(300); thread3.start(); thread3.join(300); - + Awaitility.await().between(8, TimeUnit.SECONDS, 10, TimeUnit.SECONDS).untilTrue(flag); } - + @Test public void testReadLockExpirationRenewal() throws InterruptedException { int threadCount = 50; @@ -99,12 +101,12 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { } }); } - + executorService.shutdown(); assertThat(executorService.awaitTermination(180, TimeUnit.SECONDS)).isTrue(); assertThat(exceptions.get()).isZero(); } - + @Test public void testName() throws InterruptedException, ExecutionException, TimeoutException { ExecutorService service = Executors.newFixedThreadPool(10); @@ -134,11 +136,11 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { assertThat(service.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); } - + @Test public void testWriteLockExpiration() throws InterruptedException { RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3"); - + RLock l1 = rw1.writeLock(); assertThat(l1.tryLock(10000, 10000, TimeUnit.MILLISECONDS)).isTrue(); RLock l2 = rw1.writeLock(); @@ -154,7 +156,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { } }); } - + @Test public void testInCluster() throws Exception { RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); @@ -166,22 +168,22 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { .addNode(master2) .addNode(master3); ClusterProcesses process = clusterRunner.run(); - + Config config = new Config(); config.useClusterServers() .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); RedissonClient redisson = Redisson.create(config); - + RReadWriteLock s = redisson.getReadWriteLock("1234"); s.writeLock().lock(); s.readLock().lock(); s.readLock().unlock(); s.writeLock().unlock(); - + redisson.shutdown(); process.shutdown(); } - + @Test public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException { RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock(); @@ -193,7 +195,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { readLock.lock(); executed.incrementAndGet(); }); - + Thread t2 = new Thread(() -> { RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock(); readLock.lock(); @@ -202,10 +204,10 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { t1.start(); t2.start(); - + await().atMost(11, TimeUnit.SECONDS).until(() -> executed.get() == 2); } - + @Test public void testReadLockLeaseTimeoutDiffThreadsRRW() throws InterruptedException { new Thread(() -> { @@ -218,7 +220,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { }).start(); Thread.sleep(5000); - + new Thread(() -> { RLock readLock2 = redisson.getReadWriteLock("my_read_write_lock").readLock(); try { @@ -239,11 +241,11 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { e.printStackTrace(); } }).start(); - + await().atMost(6, TimeUnit.SECONDS).untilTrue(executed); } - - + + @Test public void testReadLockLeaseTimeout() throws InterruptedException { RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock(); @@ -259,17 +261,17 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock(); Assertions.assertTrue(writeLock.tryLock()); } - + @Test public void testWR() throws InterruptedException { RReadWriteLock rw = redisson.getReadWriteLock("my_read_write_lock"); RLock writeLock = rw.writeLock(); writeLock.lock(); - + rw.readLock().lock(); assertThat(writeLock.isLocked()).isTrue(); rw.readLock().unlock(); - + assertThat(writeLock.isLocked()).isTrue(); writeLock.unlock(); assertThat(writeLock.isLocked()).isFalse(); @@ -295,12 +297,12 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { t1.start(); t1.join(100); } - + readWriteLock.writeLock().unlock(); - + assertThat(ref.await(1, TimeUnit.SECONDS)).isTrue(); } - + @Test public void testWriteReadReentrancy() throws InterruptedException { RReadWriteLock readWriteLock = redisson.getReadWriteLock("TEST"); @@ -308,7 +310,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { java.util.concurrent.locks.Lock rLock = readWriteLock.readLock(); Assertions.assertTrue(rLock.tryLock()); - + AtomicBoolean ref = new AtomicBoolean(); Thread t1 = new Thread(() -> { boolean success = readWriteLock.readLock().tryLock(); @@ -316,9 +318,9 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { }); t1.start(); t1.join(); - + Assertions.assertFalse(ref.get()); - + readWriteLock.writeLock().unlock(); Assertions.assertFalse(readWriteLock.writeLock().tryLock()); rLock.unlock(); @@ -326,7 +328,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { Assertions.assertTrue(readWriteLock.writeLock().tryLock()); readWriteLock.writeLock().unlock(); } - + @Test public void testWriteLock() throws InterruptedException { final RReadWriteLock lock = redisson.getReadWriteLock("lock"); @@ -445,6 +447,27 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest { assertThat(lock.writeLock().isLocked()).isFalse(); } + @Test + public void testReadWriteTTL() throws InterruptedException { + RReadWriteLock rwlock = redisson.getReadWriteLock("rwlock"); + rwlock.writeLock().lock(); + rwlock.readLock().lock(); + + for (int i = 0; i < 5; i++) { + assertThat(rwlock.readLock().remainTimeToLive()).isGreaterThan(19000); + TimeUnit.SECONDS.sleep(5); + } + + rwlock.writeLock().unlock(); + + for (int i = 0; i < 5; i++) { + assertThat(rwlock.readLock().remainTimeToLive()).isGreaterThan(19000); + TimeUnit.SECONDS.sleep(5); + } + + rwlock.readLock().unlock(); + } + @Test public void testExpireRead() throws InterruptedException { RReadWriteLock lock = redisson.getReadWriteLock("lock");