RLock.unlock not thrown IllegalMonitorStateException. BooleanReplayConvertor fixed.

pull/337/head
Nikita 9 years ago
parent 0dd60b3fd6
commit 0a577fed44

@ -56,9 +56,9 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return true "
+ "return 1 "
+ "else "
+ "return false end",
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect, update);
}

@ -115,7 +115,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" +
"return true",
"return 1",
Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
get(f);
}
@ -148,9 +148,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "return 1 "
+ "else "
+ "return false "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
return get(f);
@ -161,9 +161,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "return 1 "
+ "else "
+ "return false "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage);
}

@ -151,7 +151,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0",
"return table.getn(ARGV) == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -217,10 +217,10 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_BOOLEAN_ARGS1,
"local v = false " +
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
+ "then v = true end "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
@ -239,7 +239,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_BOOLEAN_ARGS1,
"local changed = false " +
"local changed = 0 " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
+ "local s = table.getn(items) "
@ -254,7 +254,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('LREM', KEYS[1], 0, element) "
+ "changed = true "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "

@ -275,25 +275,25 @@ public class RedissonLock extends RedissonExpirable implements RLock {
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('publish', KEYS[2], ARGV[2]); " +
" return true; " +
" return 1; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" o['c'] = o['c'] - 1; " +
" if (o['c'] > 0) then " +
" redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]); " +
" return false;"+
" return 0;"+
" else " +
" redis.call('del', KEYS[1]);" +
" redis.call('publish', KEYS[2], ARGV[2]); " +
" return true;"+
" return 1;"+
" end" +
" end;" +
" return nil; " +
"end",
Arrays.<Object>asList(getName(), getChannelName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime);
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
@ -317,9 +317,9 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "return 1 "
+ "else "
+ "return false "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), unlockMessage);
}
@ -334,13 +334,13 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Boolean opStatus = commandExecutor.evalRead(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return false; " +
" return 0; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" return true; " +
" return 1; " +
" else" +
" return false; " +
" return 0; " +
" end;" +
"end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId());

@ -106,10 +106,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
+ "return true "
+ "return 1 "
+ "end "
+ "end;" +
"return false",
"return 0",
Collections.<Object>singletonList(getName()), value);
}
@ -229,9 +229,9 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "return true; "
+ "return 1; "
+ "else "
+ "return false; "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
}

@ -77,30 +77,31 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"return true; " +
"else if (mode == 'read') then " +
"return 1; " +
"end; "
+ "if (mode == 'read') then " +
"local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " +
"if (lockExists == false) then " +
"if (lockExists == 0) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return false; " +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], KEYS[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; "+
"return 1; "+
"end; " +
"end; " +
"end; " +
"return nil; ",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime);
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
@ -121,9 +122,9 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; " +
"return 1; " +
"else " +
"return false; " +
"return 0; " +
"end;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage);
}

@ -16,11 +16,25 @@
package org.redisson;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import org.redisson.command.CommandExecutor;
import org.redisson.core.RLock;
import org.redisson.core.RReadWriteLock;
/**
* A {@code ReadWriteLock} maintains a pair of associated {@link
* Lock locks}, one for read-only operations and one for writing.
* The {@link #readLock read lock} may be held simultaneously by
* multiple reader threads, so long as there are no writers. The
* {@link #writeLock write lock} is exclusive.
*
* Works in non-fair mode. Therefore order of read and write
* locking is unspecified.
*
* @author Nikita Koksharov
*
*/
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
public static final Long unlockMessage = 0L;

@ -280,17 +280,17 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('zrem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
@ -309,7 +309,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local changed = 0 " +
"local s = redis.call('zrange', KEYS[1], 0, -1) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
@ -323,7 +323,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('zrem', KEYS[1], element) "
+ "changed = true "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "

@ -208,7 +208,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -237,7 +237,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS,
"local changed = false " +
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
@ -251,7 +251,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('SREM', KEYS[1], element) "
+ "changed = true "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "
@ -262,10 +262,10 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS,
"local v = false " +
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());

@ -615,8 +615,12 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
final String comparatorSign = className + ":" + calcClassSign(className);
Boolean res = commandExecutor.evalWrite(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('llen', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[1]); return true; "
+ "else return false; end",
"if redis.call('llen', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[2], ARGV[1]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign);
if (res) {
this.comparator = comparator;

@ -55,8 +55,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], KEYS[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
@ -79,30 +79,31 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"return true; " +
"else if (mode == 'write') then " +
"return 1; " +
"end;" +
"if (mode == 'write') then " +
"local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " +
"if (lockExists == false) then " +
"if (lockExists == 0) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return false; " +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], KEYS[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; "+
"return 1; "+
"end; " +
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime);
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
@ -123,9 +124,9 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; " +
"return 1; " +
"else " +
"return false; " +
"return 0; " +
"end;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage);
}

@ -19,6 +19,9 @@ public class BooleanReplayConvertor extends SingleConvertor<Boolean> {
@Override
public Boolean convert(Object obj) {
if (obj == null) {
return null;
}
return Long.valueOf(1).equals(obj) || "OK".equals(obj);
}

@ -15,21 +15,36 @@
*/
package org.redisson.core;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.
* Use {@link RReadWriteLock#getHoldCount()} to get a holds count.
* A {@code ReadWriteLock} maintains a pair of associated {@link
* Lock locks}, one for read-only operations and one for writing.
* The {@link #readLock read lock} may be held simultaneously by
* multiple reader threads, so long as there are no writers. The
* {@link #writeLock write lock} is exclusive.
*
* Works in non-fair mode. Therefore order of read and write
* locking is unspecified.
*
* @author Nikita Koksharov
*
*/
public interface RReadWriteLock extends ReadWriteLock, RExpirable {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
RLock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
RLock writeLock();
}

@ -47,9 +47,9 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return true "
+ "return 1 "
+ "else "
+ "return false end",
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect, update);
}

@ -169,7 +169,7 @@ public class RedissonListReactive<V> extends RedissonCollectionReactive<V> imple
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0",
"return table.getn(ARGV) == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -236,10 +236,10 @@ public class RedissonListReactive<V> extends RedissonCollectionReactive<V> imple
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedissonList.EVAL_BOOLEAN_ARGS1,
"local v = false " +
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
+ "then v = true end "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
@ -248,7 +248,7 @@ public class RedissonListReactive<V> extends RedissonCollectionReactive<V> imple
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedissonList.EVAL_BOOLEAN_ARGS1,
"local changed = false " +
"local changed = 0 " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
+ "local s = table.getn(items) "
@ -263,7 +263,7 @@ public class RedissonListReactive<V> extends RedissonCollectionReactive<V> imple
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('LREM', KEYS[1], 0, element) "
+ "changed = true "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "

@ -85,10 +85,10 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
+ "return true "
+ "return 1 "
+ "end "
+ "end;" +
"return false",
"return 0",
Collections.<Object>singletonList(getName()), value);
}
@ -146,9 +146,9 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "return true; "
+ "return 1; "
+ "else "
+ "return false; "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
}

@ -248,17 +248,17 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonCollectionReacti
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('zrem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
@ -267,7 +267,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonCollectionReacti
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local changed = 0 " +
"local s = redis.call('zrange', KEYS[1], 0, -1) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
@ -281,7 +281,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonCollectionReacti
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('zrem', KEYS[1], element) "
+ "changed = true "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "

@ -110,7 +110,7 @@ public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implem
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@ -125,7 +125,7 @@ public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implem
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_OBJECTS,
"local changed = false " +
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
@ -139,7 +139,7 @@ public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implem
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('SREM', KEYS[1], element) "
+ "changed = true "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "
@ -150,10 +150,10 @@ public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implem
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_OBJECTS,
"local v = false " +
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());

@ -37,19 +37,18 @@ public class RedissonLockTest extends BaseConcurrentTest {
lock.lock(2, TimeUnit.SECONDS);
final long startTime = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
Thread t = new Thread() {
public void run() {
RLock lock1 = redisson.getLock("lock");
lock1.lock();
long spendTime = System.currentTimeMillis() - startTime;
Assert.assertTrue(spendTime < 2020);
lock1.unlock();
latch.countDown();
};
}.start();
};
latch.await();
t.start();
t.join();
lock.unlock();
}
@ -95,28 +94,26 @@ public class RedissonLockTest extends BaseConcurrentTest {
RLock lock = redisson.getLock("lock");
lock.lock();
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
Thread t = new Thread() {
public void run() {
RLock lock = redisson.getLock("lock");
Assert.assertFalse(lock.isHeldByCurrentThread());
latch.countDown();
};
}.start();
};
latch.await();
t.start();
t.join();
lock.unlock();
final CountDownLatch latch2 = new CountDownLatch(1);
new Thread() {
Thread t2 = new Thread() {
public void run() {
RLock lock = redisson.getLock("lock");
Assert.assertFalse(lock.isHeldByCurrentThread());
latch2.countDown();
};
}.start();
};
latch2.await();
t2.start();
t2.join();
}
@Test
@ -134,28 +131,26 @@ public class RedissonLockTest extends BaseConcurrentTest {
RLock lock = redisson.getLock("lock");
lock.lock();
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
Thread t = new Thread() {
public void run() {
RLock lock = redisson.getLock("lock");
Assert.assertTrue(lock.isLocked());
latch.countDown();
};
}.start();
};
latch.await();
t.start();
t.join();
lock.unlock();
final CountDownLatch latch2 = new CountDownLatch(1);
new Thread() {
Thread t2 = new Thread() {
public void run() {
RLock lock = redisson.getLock("lock");
Assert.assertFalse(lock.isLocked());
latch2.countDown();
};
}.start();
};
latch2.await();
t2.start();
t2.join();
}
@Test
@ -168,13 +163,23 @@ public class RedissonLockTest extends BaseConcurrentTest {
Assert.assertFalse(lock.isLocked());
}
// @Test(expected = IllegalMonitorStateException.class)
// public void testUnlockFail() {
// Lock lock = redisson.getLock("lock1");
// lock.unlock();
// }
//
//
@Test(expected = IllegalMonitorStateException.class)
public void testUnlockFail() throws InterruptedException {
RLock lock = redisson.getLock("lock");
Thread t = new Thread() {
public void run() {
RLock lock = redisson.getLock("lock");
lock.lock();
};
};
t.start();
t.join();
lock.unlock();
}
@Test
public void testLockUnlock() {
Lock lock = redisson.getLock("lock1");
@ -196,7 +201,7 @@ public class RedissonLockTest extends BaseConcurrentTest {
Thread thread1 = new Thread() {
@Override
public void run() {
RLock lock1 = (RedissonLock) redisson.getLock("lock1");
RLock lock1 = redisson.getLock("lock1");
Assert.assertFalse(lock1.tryLock());
}
};
@ -215,14 +220,9 @@ public class RedissonLockTest extends BaseConcurrentTest {
@Override
public void run(RedissonClient redisson) {
Lock lock = redisson.getLock("testConcurrency_SingleInstance");
System.out.println("lock1 " + Thread.currentThread().getId());
lock.lock();
System.out.println("lock2 "+ Thread.currentThread().getId());
lockedCounter.incrementAndGet();
System.out.println("lockedCounter " + lockedCounter);
System.out.println("unlock1 "+ Thread.currentThread().getId());
lock.unlock();
System.out.println("unlock2 "+ Thread.currentThread().getId());
}
});

Loading…
Cancel
Save