RReadWriteLock doesn't work in cluster. #1098

pull/1124/head
Nikita 7 years ago
parent b3c3fa3884
commit 48ea96f2fe

@ -53,6 +53,10 @@ public class RedissonReadLock extends RedissonLock implements RLock {
return super.getLockName(threadId) + ":write";
}
String getReadWriteTimeoutNamePrefix(long threadId) {
return suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout";
}
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
@ -62,25 +66,27 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', 1); " +
"redis.call('pexpire', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', ARGV[1]); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. ind;" +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
@ -96,7 +102,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. (counter+1)); " +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
@ -104,7 +110,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[1] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
@ -123,7 +129,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(threadId));
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, timeoutPrefix.split(":")[0]),
LockPubSub.unlockMessage, getLockName(threadId));
}
@Override

@ -12,13 +12,43 @@ import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import com.jayway.awaitility.Awaitility;
public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1)
.addNode(master2)
.addNode(master3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RReadWriteLock s = redisson.getReadWriteLock("1234");
s.writeLock().lock();
s.readLock().lock();
s.readLock().unlock();
s.writeLock().unlock();
redisson.shutdown();
process.shutdown();
}
@Test
public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException {
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();

Loading…
Cancel
Save