Fixed - Lock watchdog does not monitor read locks #1604

pull/1639/head
Nikita 6 years ago
parent f5e3618b33
commit 387dd93071

@ -231,13 +231,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
@ -255,6 +249,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
@ -262,9 +257,21 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = expirationRenewalMap.remove(getEntryName());
ExpirationEntry task = expirationRenewalMap.get(getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(getEntryName());
task.getTimeout().cancel();
}
}

@ -83,7 +83,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = timeoutPrefix.split(":" + getLockName(threadId))[0];
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
@ -101,6 +101,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
@ -130,6 +131,39 @@ public class RedissonReadLock extends RedissonLock implements RLock {
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.unlockMessage, getLockName(threadId));
}
protected String getKeyPrefix(long threadId, String timeoutPrefix) {
return timeoutPrefix.split(":" + getLockName(threadId))[0];
}
@Override
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
"if (counter ~= false) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
"return 0;",
Arrays.<Object>asList(getName(), keyPrefix),
internalLockLeaseTime, getLockName(threadId));
}
@Override
public Condition newCondition() {

@ -30,6 +30,37 @@ import org.redisson.config.Config;
public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testReadLockExpirationRenewal() throws InterruptedException {
int threadCount = 50;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount/5);
AtomicInteger exceptions = new AtomicInteger();
for (int i=0; i<threadCount; i++) {
executorService.submit(()-> {
try {
RReadWriteLock rw1 = redisson.getReadWriteLock("mytestlock");
RLock readLock = rw1.readLock();
readLock.lock();
try {
Thread.sleep(redisson.getConfig().getLockWatchdogTimeout() + 5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
readLock.unlock();
} catch (Exception e) {
exceptions.incrementAndGet();
e.printStackTrace();
}
});
}
executorService.shutdown();
assertThat(executorService.awaitTermination(180, TimeUnit.SECONDS)).isTrue();
assertThat(exceptions.get()).isZero();
}
@Test
public void testName() throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService service = Executors.newFixedThreadPool(10);

Loading…
Cancel
Save