From 868d0916d0be53bb66627e4378d6baa196e80262 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 16 Dec 2015 12:16:41 +0300 Subject: [PATCH] CROSSLOT errors fixed for RReadWriteLock in cluster mode --- src/main/java/org/redisson/RedissonLock.java | 10 +++++-- .../java/org/redisson/RedissonReadLock.java | 28 ++++++++--------- .../org/redisson/RedissonReadWriteLock.java | 2 -- .../java/org/redisson/RedissonWriteLock.java | 30 ++++++++----------- 4 files changed, 32 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index a43f7b08f..b040c18e8 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -69,6 +69,10 @@ public class RedissonLock extends RedissonExpirable implements RLock { return "redisson_lock__channel__{" + getName() + "}"; } + String getLockName() { + return id + ":" + Thread.currentThread().getId(); + } + @Override public void lock() { try { @@ -192,7 +196,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { " end;" + " return redis.call('pttl', KEYS[1]); " + "end", - Collections.singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), internalLockLeaseTime); + Collections.singletonList(getName()), getLockName(), internalLockLeaseTime); } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { @@ -287,7 +291,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { " end;" + " return nil; " + "end", - Arrays.asList(getName(), getChannelName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime); + Arrays.asList(getName(), getChannelName()), getLockName(), unlockMessage, internalLockLeaseTime); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); @@ -339,7 +343,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { " return 0; " + " end;" + "end", - Collections.singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId()); + Collections.singletonList(getName()), getLockName()); return opStatus; } diff --git a/src/main/java/org/redisson/RedissonReadLock.java b/src/main/java/org/redisson/RedissonReadLock.java index fb1d8bd6e..0f203779f 100644 --- a/src/main/java/org/redisson/RedissonReadLock.java +++ b/src/main/java/org/redisson/RedissonReadLock.java @@ -43,10 +43,6 @@ public class RedissonReadLock extends RedissonLock implements RLock { this.commandExecutor = commandExecutor; } - private String getLockName() { - return id + ":" + Thread.currentThread().getId(); - } - String getChannelName() { return "redisson_rwlock__{" + getName() + "}"; } @@ -58,17 +54,17 @@ public class RedissonReadLock extends RedissonLock implements RLock { "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + - "redis.call('hset', KEYS[1], KEYS[2], 1); " + + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'read') then " + - "redis.call('hincrby', KEYS[1], KEYS[2], 1); " + + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + "return redis.call('pttl', KEYS[1]);", - Arrays.asList(getName(), getLockName()), internalLockLeaseTime); + Arrays.asList(getName()), internalLockLeaseTime, getLockName()); } @Override @@ -76,30 +72,30 @@ public class RedissonReadLock extends RedissonLock implements RLock { Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + - "redis.call('publish', KEYS[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "if (mode == 'read') then " + - "local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " + + "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " + "if (lockExists == 0) then " + "return nil;" + "else " + - "local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " + + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + - "redis.call('hdel', KEYS[1], KEYS[2]); " + + "redis.call('hdel', KEYS[1], ARGV[3]); " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "end; " + "return 1; "+ "end; " + "end; " + "end; " + "return nil; ", - Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime); + Arrays.asList(getName(), getChannelName()), unlockMessage, internalLockLeaseTime, getLockName()); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); @@ -117,16 +113,16 @@ public class RedissonReadLock extends RedissonLock implements RLock { Future forceUnlockAsync() { cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + + "if (redis.call('hdel', KEYS[1], ARGV[2]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "end; " + "return 1; " + "else " + "return 0; " + "end;", - Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage); + Arrays.asList(getName(), getChannelName()), unlockMessage, getLockName()); } @Override diff --git a/src/main/java/org/redisson/RedissonReadWriteLock.java b/src/main/java/org/redisson/RedissonReadWriteLock.java index a49d15aff..78aac273a 100644 --- a/src/main/java/org/redisson/RedissonReadWriteLock.java +++ b/src/main/java/org/redisson/RedissonReadWriteLock.java @@ -37,8 +37,6 @@ import org.redisson.core.RReadWriteLock; */ public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock { - public static final Long unlockMessage = 0L; - private final UUID id; private final CommandExecutor commandExecutor; diff --git a/src/main/java/org/redisson/RedissonWriteLock.java b/src/main/java/org/redisson/RedissonWriteLock.java index 474f6915c..9edecf8f6 100644 --- a/src/main/java/org/redisson/RedissonWriteLock.java +++ b/src/main/java/org/redisson/RedissonWriteLock.java @@ -43,10 +43,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock { this.commandExecutor = commandExecutor; } - private String getLockName() { - return id + ":" + Thread.currentThread().getId(); - } - String getChannelName() { return "redisson_rwlock__{" + getName() + "}"; } @@ -58,19 +54,19 @@ public class RedissonWriteLock extends RedissonLock implements RLock { "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + - "redis.call('hset', KEYS[1], KEYS[2], 1); " + + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'write') then " + - "if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then " + - "redis.call('hincrby', KEYS[1], KEYS[2], 1); " + + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "end;" + "return redis.call('pttl', KEYS[1]);", - Arrays.asList(getName(), getLockName()), internalLockLeaseTime); + Arrays.asList(getName()), internalLockLeaseTime, getLockName()); } @Override @@ -78,30 +74,30 @@ public class RedissonWriteLock extends RedissonLock implements RLock { Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + - "redis.call('publish', KEYS[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (mode == 'write') then " + - "local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " + + "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " + "if (lockExists == 0) then " + "return nil;" + "else " + - "local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " + + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + - "redis.call('hdel', KEYS[1], KEYS[2]); " + + "redis.call('hdel', KEYS[1], ARGV[3]); " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "end; " + "return 1; "+ "end; " + "end; " + "end; " + "return nil;", - Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime); + Arrays.asList(getName(), getChannelName()), unlockMessage, internalLockLeaseTime, getLockName()); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); @@ -119,16 +115,16 @@ public class RedissonWriteLock extends RedissonLock implements RLock { Future forceUnlockAsync() { cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + + "if (redis.call('hdel', KEYS[1], ARGV[2]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[3], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + "end; " + "return 1; " + "else " + "return 0; " + "end;", - Arrays.asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage); + Arrays.asList(getName(), getChannelName()), unlockMessage, getLockName()); } @Override