Merge branch 'master' of https://github.com/mrniko/redisson into feature/travis-ci

pull/509/head
jackygurui 9 years ago
commit 5b1591818e

@ -377,6 +377,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
} }
public Future<Void> unlockAsync() { public Future<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
public Future<Void> unlockAsync(final long threadId) {
final Promise<Void> result = newPromise(); final Promise<Void> result = newPromise();
Future<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, Future<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " + "if (redis.call('exists', KEYS[1]) == 0) then " +
@ -396,7 +401,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
"return 1; "+ "return 1; "+
"end; " + "end; " +
"return nil;", "return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() { future.addListener(new FutureListener<Boolean>() {
@Override @Override
@ -409,7 +414,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Boolean opStatus = future.getNow(); Boolean opStatus = future.getNow();
if (opStatus == null) { if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId()); + id + " thread-id: " + threadId);
result.setFailure(cause); result.setFailure(cause);
return; return;
} }
@ -428,8 +433,12 @@ public class RedissonLock extends RedissonExpirable implements RLock {
} }
public Future<Void> lockAsync(final long leaseTime, final TimeUnit unit) { public Future<Void> lockAsync(final long leaseTime, final TimeUnit unit) {
final Promise<Void> result = newPromise();
final long currentThreadId = Thread.currentThread().getId(); final long currentThreadId = Thread.currentThread().getId();
return lockAsync(leaseTime, unit, currentThreadId);
}
public Future<Void> lockAsync(final long leaseTime, final TimeUnit unit, final long currentThreadId) {
final Promise<Void> result = newPromise();
Future<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId); Future<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.addListener(new FutureListener<Long>() { ttlFuture.addListener(new FutureListener<Long>() {
@Override @Override
@ -525,8 +534,13 @@ public class RedissonLock extends RedissonExpirable implements RLock {
} }
public Future<Boolean> tryLockAsync() { public Future<Boolean> tryLockAsync() {
long threadId = Thread.currentThread().getId();
return tryLockAsync(threadId);
}
public Future<Boolean> tryLockAsync(long threadId) {
final Promise<Boolean> result = newPromise(); final Promise<Boolean> result = newPromise();
Future<Long> future = tryLockInnerAsync(Thread.currentThread().getId()); Future<Long> future = tryLockInnerAsync(threadId);
future.addListener(new FutureListener<Long>() { future.addListener(new FutureListener<Long>() {
@Override @Override
public void operationComplete(Future<Long> future) throws Exception { public void operationComplete(Future<Long> future) throws Exception {
@ -546,10 +560,15 @@ public class RedissonLock extends RedissonExpirable implements RLock {
} }
public Future<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit) { public Future<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit) {
final long currentThreadId = Thread.currentThread().getId();
return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
}
public Future<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit,
final long currentThreadId) {
final Promise<Boolean> result = newPromise(); final Promise<Boolean> result = newPromise();
final AtomicLong time = new AtomicLong(unit.toMillis(waitTime)); final AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
final long currentThreadId = Thread.currentThread().getId();
Future<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId); Future<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.addListener(new FutureListener<Long>() { ttlFuture.addListener(new FutureListener<Long>() {
@Override @Override

@ -21,11 +21,13 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.redisson.RedissonLock;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
@ -74,7 +76,8 @@ public class RedissonMultiLock implements Lock {
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise(); Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
lock(promise, 0, leaseTime, unit); long currentThreadId = Thread.currentThread().getId();
lock(promise, 0, leaseTime, unit, locks, currentThreadId);
promise.sync(); promise.sync();
} }
@ -84,50 +87,61 @@ public class RedissonMultiLock implements Lock {
lockInterruptibly(-1, null); lockInterruptibly(-1, null);
} }
private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit) throws InterruptedException { private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit, final List<RLock> locks, final long currentThreadId) throws InterruptedException {
final AtomicInteger tryLockRequestsAmount = new AtomicInteger(); final AtomicInteger tryLockRequestsAmount = new AtomicInteger();
final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size()); final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size());
FutureListener<Boolean> listener = new FutureListener<Boolean>() { FutureListener<Boolean> listener = new FutureListener<Boolean>() {
AtomicBoolean unlock = new AtomicBoolean(); AtomicReference<RLock> lockedLockHolder = new AtomicReference<RLock>();
AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
@Override @Override
public void operationComplete(Future<Boolean> future) throws Exception { public void operationComplete(final Future<Boolean> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
// unlock once failed.compareAndSet(null, future.cause());
if (unlock.compareAndSet(false, true)) {
for (RLock lock : locks) {
lock.unlockAsync();
}
promise.setFailure(future.cause());
}
return;
} }
Boolean res = future.getNow(); Boolean res = future.getNow();
// unlock once if (res != null && !res) {
if (!res && unlock.compareAndSet(false, true)) { RLock lock = tryLockFutures.get(future);
for (RLock lock : locks) { lockedLockHolder.compareAndSet(null, lock);
lock.unlockAsync(); }
if (tryLockRequestsAmount.decrementAndGet() == 0) {
if (lockedLockHolder.get() == null && failed.get() == null) {
promise.setSuccess(null);
return;
} }
RLock lock = tryLockFutures.get(future); tryLockRequestsAmount.set(tryLockFutures.size());
lock.lockAsync().addListener(new FutureListener<Void>() { for (RLock lock : tryLockFutures.values()) {
@Override lock.unlockAsync().addListener(new FutureListener<Void>() {
public void operationComplete(Future<Void> future) throws Exception { @Override
if (!future.isSuccess()) { public void operationComplete(Future<Void> future) throws Exception {
promise.setFailure(future.cause()); if (tryLockRequestsAmount.decrementAndGet() == 0) {
return; if (failed.get() != null) {
promise.setFailure(failed.get());
} else if (lockedLockHolder.get() != null) {
final RedissonLock lockedLock = (RedissonLock) lockedLockHolder.get();
lockedLock.lockAsync(leaseTime, unit, currentThreadId).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
List<RLock> newLocks = new ArrayList<RLock>(tryLockFutures.values());
newLocks.remove(lockedLock);
lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId);
}
});
}
}
} }
});
lock(promise, waitTime, leaseTime, unit); }
}
});
}
if (!unlock.get() && tryLockRequestsAmount.decrementAndGet() == 0) {
promise.setSuccess(null);
} }
} }
}; };
@ -138,11 +152,13 @@ public class RedissonMultiLock implements Lock {
} }
tryLockRequestsAmount.incrementAndGet(); tryLockRequestsAmount.incrementAndGet();
Future<Boolean> future;
if (waitTime > 0 || leaseTime > 0) { if (waitTime > 0 || leaseTime > 0) {
tryLockFutures.put(lock.tryLockAsync(waitTime, leaseTime, unit), lock); future = ((RedissonLock)lock).tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
} else { } else {
tryLockFutures.put(lock.tryLockAsync(), lock); future = ((RedissonLock)lock).tryLockAsync(currentThreadId);
} }
tryLockFutures.put(future, lock);
} }
for (Future<Boolean> future : tryLockFutures.keySet()) { for (Future<Boolean> future : tryLockFutures.keySet()) {

@ -22,6 +22,41 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonMultiLockTest { public class RedissonMultiLockTest {
@Test
public void testMultiThreads() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance1();
Config config1 = new Config();
config1.useSingleServer().setAddress("127.0.0.1:6320");
RedissonClient client = Redisson.create(config1);
RLock lock1 = client.getLock("lock1");
RLock lock2 = client.getLock("lock2");
RLock lock3 = client.getLock("lock3");
Thread t = new Thread() {
public void run() {
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
t.join(1000);
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
lock.unlock();
assertThat(redis1.stop()).isEqualTo(0);
}
@Test @Test
public void test() throws IOException, InterruptedException { public void test() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance1(); RedisProcess redis1 = redisTestMultilockInstance1();

Loading…
Cancel
Save