Thread stuck at RedissonRedLock & RedissonMultiLock lock method fixed

pull/653/head
Nikita 8 years ago
parent afa911d9d2
commit 70ffe42dd3

@ -18,25 +18,18 @@ package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* Groups multiple independent locks and manages them as one lock.
@ -78,119 +71,23 @@ public class RedissonMultiLock implements Lock {
}
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
RPromise<Void> promise = new RedissonPromise<Void>();
long currentThreadId = Thread.currentThread().getId();
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
lock(promise, 0, leaseTime, unit, locks, currentThreadId, lockedLocks);
promise.sync();
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
private void lock(final RPromise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit,
final List<RLock> locks, final long currentThreadId, final Queue<RLock> lockedLocks) throws InterruptedException {
final AtomicInteger tryLockRequestsAmount = new AtomicInteger();
final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size());
FutureListener<Boolean> listener = new FutureListener<Boolean>() {
AtomicReference<RLock> lockedLockHolder = new AtomicReference<RLock>();
AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
@Override
public void operationComplete(final Future<Boolean> future) throws Exception {
if (isLockFailed(future)) {
failed.compareAndSet(null, future.cause());
}
Boolean res = future.getNow();
if (res != null) {
RLock lock = tryLockFutures.get(future);
if (res) {
lockedLocks.add(lock);
} else {
lockedLockHolder.compareAndSet(null, lock);
}
}
if (tryLockRequestsAmount.decrementAndGet() == 0) {
if (isAllLocksAcquired(lockedLockHolder, failed, lockedLocks)) {
if (!promise.trySuccess(null)) {
unlockInner(lockedLocks);
}
return;
}
if (lockedLocks.isEmpty()) {
tryLockAgain(promise, waitTime, leaseTime, unit, currentThreadId, tryLockFutures);
return;
}
final AtomicInteger locksToUnlockAmount = new AtomicInteger(lockedLocks.size());
for (RLock lock : lockedLocks) {
lock.unlockAsync().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (locksToUnlockAmount.decrementAndGet() == 0) {
tryLockAgain(promise, waitTime, leaseTime, unit, currentThreadId, tryLockFutures);
}
}
});
}
}
}
protected void tryLockAgain(final RPromise<Void> promise, final long waitTime, final long leaseTime,
final TimeUnit unit, final long currentThreadId, final Map<Future<Boolean>, RLock> tryLockFutures) throws InterruptedException {
lockedLocks.clear();
if (failed.get() != null) {
promise.tryFailure(failed.get());
} else if (lockedLockHolder.get() != null) {
final RedissonLock lockedLock = (RedissonLock) lockedLockHolder.get();
lockedLock.lockAsync(leaseTime, unit, currentThreadId).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
lockedLocks.add(lockedLock);
List<RLock> newLocks = new ArrayList<RLock>(tryLockFutures.values());
newLocks.remove(lockedLock);
lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId, lockedLocks);
}
});
} else {
lock(promise, waitTime, leaseTime, unit, locks, currentThreadId, lockedLocks);
}
}
};
for (RLock lock : locks) {
tryLockRequestsAmount.incrementAndGet();
Future<Boolean> future;
if (waitTime > 0 || leaseTime > 0) {
future = ((RedissonPromise)((RedissonLock)lock).tryLockAsync(waitTime, leaseTime, unit, currentThreadId)).getInnerPromise();
} else {
future = ((RedissonPromise)(((RedissonLock)lock).tryLockAsync(currentThreadId))).getInnerPromise();
}
if (future instanceof RedissonPromise) {
future = ((RedissonPromise<Boolean>)future).getInnerPromise();
}
tryLockFutures.put(future, lock);
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long waitTime = -1;
if (leaseTime == -1) {
waitTime = 5;
unit = TimeUnit.SECONDS;
} else {
waitTime = unit.convert(5, TimeUnit.SECONDS);
}
for (Future<Boolean> future : tryLockFutures.keySet()) {
future.addListener(listener);
while (true) {
if (tryLock(waitTime, leaseTime, unit)) {
return;
}
}
}
@ -223,7 +120,7 @@ public class RedissonMultiLock implements Lock {
protected int failedLocksLimit() {
return 0;
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
@ -240,15 +137,23 @@ public class RedissonMultiLock implements Lock {
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit);
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit);
}
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
lockedLocks.add(lock);
} else {
if (locks.size() - lockedLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(lockedLocks);
if (waitTime == -1 && leaseTime == -1) {
@ -256,6 +161,10 @@ public class RedissonMultiLock implements Lock {
}
failedLocksLimit = failedLocksLimit();
lockedLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
@ -264,7 +173,7 @@ public class RedissonMultiLock implements Lock {
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime < 0) {
if (remainTime <= 0) {
unlockInner(lockedLocks);
return false;
}

@ -22,6 +22,51 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonRedLockTest {
@Test
public void testLockLeasetime() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance();
RedisProcess redis2 = redisTestMultilockInstance();
RedissonClient client1 = createClient(redis1.getRedisServerAddressAndPort());
RedissonClient client2 = createClient(redis2.getRedisServerAddressAndPort());
RLock lock1 = client1.getLock("lock1");
RLock lock2 = client1.getLock("lock2");
RLock lock3 = client2.getLock("lock3");
RLock lock4 = client2.getLock("lock3");
RLock lock5 = client2.getLock("lock3");
RLock lock6 = client2.getLock("lock3");
RLock lock7 = client2.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3, lock4, lock5, lock6, lock7);
ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
try {
lock.lock(2, TimeUnit.SECONDS);
int nextValue = counter.get() + 1;
Thread.sleep(1000);
counter.set(nextValue);
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue();
assertThat(counter.get()).isEqualTo(50);
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
}
@Test
public void testTryLockLeasetime() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance();

Loading…
Cancel
Save