From 387dd9307182f17ce785accf325ed8eb27909438 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 10 Sep 2018 11:33:15 +0300 Subject: [PATCH] Fixed - Lock watchdog does not monitor read locks #1604 --- .../main/java/org/redisson/RedissonLock.java | 23 +++++++----- .../java/org/redisson/RedissonReadLock.java | 36 ++++++++++++++++++- .../redisson/RedissonReadWriteLockTest.java | 31 ++++++++++++++++ 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 168d51757..5a56b4a41 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -231,13 +231,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void run(Timeout timeout) throws Exception { - RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return 1; " + - "end; " + - "return 0;", - Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); + RFuture future = renewExpirationAsync(threadId); future.addListener(new FutureListener() { @Override @@ -255,6 +249,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { } }); } + }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) { @@ -262,9 +257,21 @@ public class RedissonLock extends RedissonExpirable implements RLock { } } + protected RFuture renewExpirationAsync(long threadId) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return 1; " + + "end; " + + "return 0;", + Collections.singletonList(getName()), + internalLockLeaseTime, getLockName(threadId)); + } + void cancelExpirationRenewal(Long threadId) { - ExpirationEntry task = expirationRenewalMap.remove(getEntryName()); + ExpirationEntry task = expirationRenewalMap.get(getEntryName()); if (task != null && (threadId == null || task.getThreadId() == threadId)) { + expirationRenewalMap.remove(getEntryName()); task.getTimeout().cancel(); } } diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index d7907d01d..83205aebd 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -83,7 +83,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { @Override protected RFuture unlockInnerAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); - String keyPrefix = timeoutPrefix.split(":" + getLockName(threadId))[0]; + String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + @@ -101,6 +101,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" + "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + + "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + @@ -130,6 +131,39 @@ public class RedissonReadLock extends RedissonLock implements RLock { Arrays.asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.unlockMessage, getLockName(threadId)); } + + protected String getKeyPrefix(long threadId, String timeoutPrefix) { + return timeoutPrefix.split(":" + getLockName(threadId))[0]; + } + + @Override + protected RFuture renewExpirationAsync(long threadId) { + String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); + String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); + + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local counter = redis.call('hget', KEYS[1], ARGV[2]); " + + "if (counter ~= false) then " + + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + + "if (redis.call('hlen', KEYS[1]) > 1) then " + + "local keys = redis.call('hkeys', KEYS[1]); " + + "for n, key in ipairs(keys) do " + + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + + "if type(counter) == 'number' then " + + "for i=counter, 1, -1 do " + + "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + + "end; " + + "end; " + + "end; " + + "end; " + + + "return 1; " + + "end; " + + "return 0;", + Arrays.asList(getName(), keyPrefix), + internalLockLeaseTime, getLockName(threadId)); + } @Override public Condition newCondition() { diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index 02009eb9c..fe9aa96ca 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -30,6 +30,37 @@ import org.redisson.config.Config; public class RedissonReadWriteLockTest extends BaseConcurrentTest { + @Test + public void testReadLockExpirationRenewal() throws InterruptedException { + int threadCount = 50; + + ExecutorService executorService = Executors.newFixedThreadPool(threadCount/5); + + AtomicInteger exceptions = new AtomicInteger(); + for (int i=0; i { + try { + RReadWriteLock rw1 = redisson.getReadWriteLock("mytestlock"); + RLock readLock = rw1.readLock(); + readLock.lock(); + try { + Thread.sleep(redisson.getConfig().getLockWatchdogTimeout() + 5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + readLock.unlock(); + } catch (Exception e) { + exceptions.incrementAndGet(); + e.printStackTrace(); + } + }); + } + + executorService.shutdown(); + assertThat(executorService.awaitTermination(180, TimeUnit.SECONDS)).isTrue(); + assertThat(exceptions.get()).isZero(); + } + @Test public void testName() throws InterruptedException, ExecutionException, TimeoutException { ExecutorService service = Executors.newFixedThreadPool(10);