CROSSLOT errors fixed for RReadWriteLock in cluster mode

pull/337/head
Nikita 9 years ago
parent 8c819b9fb2
commit 868d0916d0

@ -69,6 +69,10 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return "redisson_lock__channel__{" + getName() + "}"; return "redisson_lock__channel__{" + getName() + "}";
} }
String getLockName() {
return id + ":" + Thread.currentThread().getId();
}
@Override @Override
public void lock() { public void lock() {
try { try {
@ -192,7 +196,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
" end;" + " end;" +
" return redis.call('pttl', KEYS[1]); " + " return redis.call('pttl', KEYS[1]); " +
"end", "end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), internalLockLeaseTime); Collections.<Object>singletonList(getName()), getLockName(), internalLockLeaseTime);
} }
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
@ -287,7 +291,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
" end;" + " end;" +
" return nil; " + " return nil; " +
"end", "end",
Arrays.<Object>asList(getName(), getChannelName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime); Arrays.<Object>asList(getName(), getChannelName()), getLockName(), unlockMessage, internalLockLeaseTime);
if (opStatus == null) { if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId()); + id + " thread-id: " + Thread.currentThread().getId());
@ -339,7 +343,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
" return 0; " + " return 0; " +
" end;" + " end;" +
"end", "end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId()); Collections.<Object>singletonList(getName()), getLockName());
return opStatus; return opStatus;
} }

@ -43,10 +43,6 @@ public class RedissonReadLock extends RedissonLock implements RLock {
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
} }
private String getLockName() {
return id + ":" + Thread.currentThread().getId();
}
String getChannelName() { String getChannelName() {
return "redisson_rwlock__{" + getName() + "}"; return "redisson_rwlock__{" + getName() + "}";
} }
@ -58,17 +54,17 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], KEYS[2], 1); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + "return nil; " +
"end; " + "end; " +
"if (mode == 'read') then " + "if (mode == 'read') then " +
"redis.call('hincrby', KEYS[1], KEYS[2], 1); " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + "return nil; " +
"end;" + "end;" +
"return redis.call('pttl', KEYS[1]);", "return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getLockName()), internalLockLeaseTime); Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName());
} }
@Override @Override
@ -76,30 +72,30 @@ public class RedissonReadLock extends RedissonLock implements RLock {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
"redis.call('publish', KEYS[3], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " + "return 1; " +
"end; " "end; "
+ "if (mode == 'read') then " + + "if (mode == 'read') then " +
"local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
"if (lockExists == 0) then " + "if (lockExists == 0) then " +
"return nil;" + "return nil;" +
"else " + "else " +
"local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " + "if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + "redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " + "return 0; " +
"else " + "else " +
"redis.call('hdel', KEYS[1], KEYS[2]); " + "redis.call('hdel', KEYS[1], ARGV[3]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " + "end; " +
"return 1; "+ "return 1; "+
"end; " + "end; " +
"end; " + "end; " +
"end; " + "end; " +
"return nil; ", "return nil; ",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime); Arrays.<Object>asList(getName(), getChannelName()), unlockMessage, internalLockLeaseTime, getLockName());
if (opStatus == null) { if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId()); + id + " thread-id: " + Thread.currentThread().getId());
@ -117,16 +113,16 @@ public class RedissonReadLock extends RedissonLock implements RLock {
Future<Boolean> forceUnlockAsync() { Future<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(); cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + "if (redis.call('hdel', KEYS[1], ARGV[2]) == 1) then " +
"if (redis.call('hlen', KEYS[1]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " + "end; " +
"return 1; " + "return 1; " +
"else " + "else " +
"return 0; " + "return 0; " +
"end;", "end;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage); Arrays.<Object>asList(getName(), getChannelName()), unlockMessage, getLockName());
} }
@Override @Override

@ -37,8 +37,6 @@ import org.redisson.core.RReadWriteLock;
*/ */
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock { public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
public static final Long unlockMessage = 0L;
private final UUID id; private final UUID id;
private final CommandExecutor commandExecutor; private final CommandExecutor commandExecutor;

@ -43,10 +43,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
} }
private String getLockName() {
return id + ":" + Thread.currentThread().getId();
}
String getChannelName() { String getChannelName() {
return "redisson_rwlock__{" + getName() + "}"; return "redisson_rwlock__{" + getName() + "}";
} }
@ -58,19 +54,19 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], KEYS[2], 1); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + "return nil; " +
"end; " + "end; " +
"if (mode == 'write') then " + "if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], KEYS[2], 1); " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + "return nil; " +
"end; " + "end; " +
"end;" + "end;" +
"return redis.call('pttl', KEYS[1]);", "return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getLockName()), internalLockLeaseTime); Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName());
} }
@Override @Override
@ -78,30 +74,30 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " + "local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " + "if (mode == false) then " +
"redis.call('publish', KEYS[3], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " + "return 1; " +
"end;" + "end;" +
"if (mode == 'write') then " + "if (mode == 'write') then " +
"local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
"if (lockExists == 0) then " + "if (lockExists == 0) then " +
"return nil;" + "return nil;" +
"else " + "else " +
"local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " + "if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + "redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " + "return 0; " +
"else " + "else " +
"redis.call('hdel', KEYS[1], KEYS[2]); " + "redis.call('hdel', KEYS[1], ARGV[3]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " + "end; " +
"return 1; "+ "return 1; "+
"end; " + "end; " +
"end; " + "end; " +
"end; " "end; "
+ "return nil;", + "return nil;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime); Arrays.<Object>asList(getName(), getChannelName()), unlockMessage, internalLockLeaseTime, getLockName());
if (opStatus == null) { if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId()); + id + " thread-id: " + Thread.currentThread().getId());
@ -119,16 +115,16 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
Future<Boolean> forceUnlockAsync() { Future<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(); cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + "if (redis.call('hdel', KEYS[1], ARGV[2]) == 1) then " +
"if (redis.call('hlen', KEYS[1]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " + "end; " +
"return 1; " + "return 1; " +
"else " + "else " +
"return 0; " + "return 0; " +
"end;", "end;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage); Arrays.<Object>asList(getName(), getChannelName()), unlockMessage, getLockName());
} }
@Override @Override

Loading…
Cancel
Save