diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index 4bf5f770b..096a335ac 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -53,6 +53,10 @@ public class RedissonReadLock extends RedissonLock implements RLock { return super.getLockName(threadId) + ":write"; } + String getReadWriteTimeoutNamePrefix(long threadId) { + return suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout"; + } + @Override <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); @@ -62,25 +66,27 @@ public class RedissonReadLock extends RedissonLock implements RLock { "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + - "redis.call('set', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', 1); " + - "redis.call('pexpire', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:1', ARGV[1]); " + + "redis.call('set', KEYS[2] .. ':1', 1); " + + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + - "local key = KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. ind;" + + "local key = KEYS[2] .. ':' .. ind;" + "redis.call('set', key, 1); " + "redis.call('pexpire', key, ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + "return redis.call('pttl', KEYS[1]);", - Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); + Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), + internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); } @Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { + String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + @@ -96,7 +102,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { "if (counter == 0) then " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" + - "redis.call('del', KEYS[1] .. ':' .. ARGV[2] .. ':rwlock_timeout:' .. (counter+1)); " + + "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + @@ -104,7 +110,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + - "local remainTime = redis.call('pttl', KEYS[1] .. ':' .. key .. ':rwlock_timeout:' .. i); " + + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + @@ -123,7 +129,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", - Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(threadId)); + Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, timeoutPrefix.split(":")[0]), + LockPubSub.unlockMessage, getLockName(threadId)); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index b4563eee6..1747afb78 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -12,13 +12,43 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; +import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.api.RLock; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; import com.jayway.awaitility.Awaitility; public class RedissonReadWriteLockTest extends BaseConcurrentTest { + @Test + public void testInCluster() throws Exception { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1) + .addNode(master2) + .addNode(master3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RReadWriteLock s = redisson.getReadWriteLock("1234"); + s.writeLock().lock(); + s.readLock().lock(); + s.readLock().unlock(); + s.writeLock().unlock(); + + redisson.shutdown(); + process.shutdown(); + } + @Test public void testReadLockLeaseTimeoutDiffThreadsWRR() throws InterruptedException { RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();