Fixed - Incorrect RedissonRedLock.tryLock behaviour. #624

pull/626/head
Nikita 8 years ago
parent 5731b9eb0b
commit 7eaf465f57

@ -20,8 +20,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@ -196,36 +196,12 @@ public class RedissonMultiLock implements Lock {
@Override
public boolean tryLock() {
Map<RLock, RFuture<Boolean>> tryLockFutures = new HashMap<RLock, RFuture<Boolean>>(locks.size());
for (RLock lock : locks) {
tryLockFutures.put(lock, lock.tryLockAsync());
}
return sync(tryLockFutures);
}
protected boolean sync(Map<RLock, RFuture<Boolean>> tryLockFutures) {
List<RLock> lockedLocks = new ArrayList<RLock>(tryLockFutures.size());
RuntimeException latestException = null;
for (Entry<RLock, RFuture<Boolean>> entry : tryLockFutures.entrySet()) {
try {
if (entry.getValue().syncUninterruptibly().getNow()) {
lockedLocks.add(entry.getKey());
}
} catch (RuntimeException e) {
latestException = e;
}
}
if (lockedLocks.size() < tryLockFutures.size()) {
unlockInner(lockedLocks);
if (latestException != null) {
throw latestException;
}
return tryLock(-1, -1, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
protected void unlockInner(Collection<RLock> locks) {
@ -244,13 +220,69 @@ public class RedissonMultiLock implements Lock {
return tryLock(waitTime, -1, unit);
}
protected int failedLocksLimit() {
return 1;
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
Map<RLock, RFuture<Boolean>> tryLockFutures = new HashMap<RLock, RFuture<Boolean>>(locks.size());
for (RLock lock : locks) {
tryLockFutures.put(lock, lock.tryLockAsync(waitTime, leaseTime, unit));
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = waitTime*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
int failedLocksLimit = failedLocksLimit();
List<RLock> lockedLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit);
}
if (lockAcquired) {
lockedLocks.add(lock);
} else {
failedLocksLimit--;
if (failedLocksLimit == 0) {
unlockInner(lockedLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
lockedLocks.clear();
}
}
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime < 0) {
unlockInner(lockedLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(lockedLocks.size());
for (RLock rLock : lockedLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
return sync(tryLockFutures);
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}

@ -15,14 +15,10 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import io.netty.util.concurrent.Future;
@ -48,28 +44,9 @@ public class RedissonRedLock extends RedissonMultiLock {
super(locks);
}
protected boolean sync(Map<RLock, RFuture<Boolean>> tryLockFutures) {
List<RLock> lockedLocks = new ArrayList<RLock>(tryLockFutures.size());
RuntimeException latestException = null;
for (Entry<RLock, RFuture<Boolean>> entry : tryLockFutures.entrySet()) {
try {
if (entry.getValue().syncUninterruptibly().getNow()) {
lockedLocks.add(entry.getKey());
}
} catch (RuntimeException e) {
latestException = e;
}
}
if (lockedLocks.size() < minLocksAmount(locks)) {
unlockInner(lockedLocks);
if (latestException != null) {
throw latestException;
}
return false;
}
return true;
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
protected int minLocksAmount(final List<RLock> locks) {

@ -1,8 +1,11 @@
package org.redisson;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
@ -19,6 +22,50 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonRedLockTest {
@Test
public void testTryLockLeasetime() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance();
RedisProcess redis2 = redisTestMultilockInstance();
RedissonClient client1 = createClient(redis1.getRedisServerAddressAndPort());
RedissonClient client2 = createClient(redis2.getRedisServerAddressAndPort());
RLock lock1 = client1.getLock("lock1");
RLock lock2 = client1.getLock("lock2");
RLock lock3 = client2.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
try {
if (lock.tryLock(4, 2, TimeUnit.SECONDS)) {
int nextValue = counter.get() + 1;
Thread.sleep(1000);
counter.set(nextValue);
lock.unlock();
} else {
j--;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue();
assertThat(counter.get()).isEqualTo(50);
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
}
@Test
public void testLockFailed() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance();

Loading…
Cancel
Save