From 7eaf465f577e5fb38a964f0dff9bf89ff1f29384 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 20 Sep 2016 17:10:30 +0300 Subject: [PATCH] Fixed - Incorrect RedissonRedLock.tryLock behaviour. #624 --- .../java/org/redisson/RedissonMultiLock.java | 98 ++++++++++++------- .../java/org/redisson/RedissonRedLock.java | 33 +------ .../org/redisson/RedissonRedLockTest.java | 47 +++++++++ 3 files changed, 117 insertions(+), 61 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 5b9e7613c..8d4e7abfc 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -20,8 +20,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -196,36 +196,12 @@ public class RedissonMultiLock implements Lock { @Override public boolean tryLock() { - Map> tryLockFutures = new HashMap>(locks.size()); - for (RLock lock : locks) { - tryLockFutures.put(lock, lock.tryLockAsync()); - } - - return sync(tryLockFutures); - } - - protected boolean sync(Map> tryLockFutures) { - List lockedLocks = new ArrayList(tryLockFutures.size()); - RuntimeException latestException = null; - for (Entry> entry : tryLockFutures.entrySet()) { - try { - if (entry.getValue().syncUninterruptibly().getNow()) { - lockedLocks.add(entry.getKey()); - } - } catch (RuntimeException e) { - latestException = e; - } - } - - if (lockedLocks.size() < tryLockFutures.size()) { - unlockInner(lockedLocks); - if (latestException != null) { - throw latestException; - } + try { + return tryLock(-1, -1, null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return false; } - - return true; } protected void unlockInner(Collection locks) { @@ -243,14 +219,70 @@ public class RedissonMultiLock implements Lock { public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); } + + protected int failedLocksLimit() { + return 1; + } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { - Map> tryLockFutures = new HashMap>(locks.size()); - for (RLock lock : locks) { - tryLockFutures.put(lock, lock.tryLockAsync(waitTime, leaseTime, unit)); + long newLeaseTime = -1; + if (leaseTime != -1) { + newLeaseTime = waitTime*2; + } + + long time = System.currentTimeMillis(); + long remainTime = -1; + if (waitTime != -1) { + remainTime = unit.toMillis(waitTime); + } + int failedLocksLimit = failedLocksLimit(); + List lockedLocks = new ArrayList(locks.size()); + for (ListIterator iterator = locks.listIterator(); iterator.hasNext();) { + RLock lock = iterator.next(); + boolean lockAcquired; + if (waitTime == -1 && leaseTime == -1) { + lockAcquired = lock.tryLock(); + } else { + lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit); + } + + if (lockAcquired) { + lockedLocks.add(lock); + } else { + failedLocksLimit--; + if (failedLocksLimit == 0) { + unlockInner(lockedLocks); + if (waitTime == -1 && leaseTime == -1) { + return false; + } + failedLocksLimit = failedLocksLimit(); + lockedLocks.clear(); + } + } + + if (remainTime != -1) { + remainTime -= (System.currentTimeMillis() - time); + time = System.currentTimeMillis(); + if (remainTime < 0) { + unlockInner(lockedLocks); + return false; + } + } } - return sync(tryLockFutures); + if (leaseTime != -1) { + List> futures = new ArrayList>(lockedLocks.size()); + for (RLock rLock : lockedLocks) { + RFuture future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); + futures.add(future); + } + + for (RFuture rFuture : futures) { + rFuture.syncUninterruptibly(); + } + } + + return true; } diff --git a/redisson/src/main/java/org/redisson/RedissonRedLock.java b/redisson/src/main/java/org/redisson/RedissonRedLock.java index 25b296d01..de09713e8 100644 --- a/redisson/src/main/java/org/redisson/RedissonRedLock.java +++ b/redisson/src/main/java/org/redisson/RedissonRedLock.java @@ -15,14 +15,10 @@ */ package org.redisson; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.api.RFuture; import org.redisson.api.RLock; import io.netty.util.concurrent.Future; @@ -47,31 +43,12 @@ public class RedissonRedLock extends RedissonMultiLock { public RedissonRedLock(RLock... locks) { super(locks); } - - protected boolean sync(Map> tryLockFutures) { - List lockedLocks = new ArrayList(tryLockFutures.size()); - RuntimeException latestException = null; - for (Entry> entry : tryLockFutures.entrySet()) { - try { - if (entry.getValue().syncUninterruptibly().getNow()) { - lockedLocks.add(entry.getKey()); - } - } catch (RuntimeException e) { - latestException = e; - } - } - - if (lockedLocks.size() < minLocksAmount(locks)) { - unlockInner(lockedLocks); - if (latestException != null) { - throw latestException; - } - return false; - } - - return true; - } + @Override + protected int failedLocksLimit() { + return locks.size() - minLocksAmount(locks); + } + protected int minLocksAmount(final List locks) { return locks.size()/2 + 1; } diff --git a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java index bb2d76e44..0e29d6c55 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java @@ -1,8 +1,11 @@ package org.redisson; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; @@ -19,6 +22,50 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonRedLockTest { + @Test + public void testTryLockLeasetime() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(); + RedisProcess redis2 = redisTestMultilockInstance(); + + RedissonClient client1 = createClient(redis1.getRedisServerAddressAndPort()); + RedissonClient client2 = createClient(redis2.getRedisServerAddressAndPort()); + + RLock lock1 = client1.getLock("lock1"); + RLock lock2 = client1.getLock("lock2"); + RLock lock3 = client2.getLock("lock3"); + + RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); + + ExecutorService executor = Executors.newFixedThreadPool(10); + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 10; i++) { + executor.submit(() -> { + for (int j = 0; j < 5; j++) { + try { + if (lock.tryLock(4, 2, TimeUnit.SECONDS)) { + int nextValue = counter.get() + 1; + Thread.sleep(1000); + counter.set(nextValue); + lock.unlock(); + } else { + j--; + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue(); + assertThat(counter.get()).isEqualTo(50); + + assertThat(redis1.stop()).isEqualTo(0); + assertThat(redis2.stop()).isEqualTo(0); + } + + @Test public void testLockFailed() throws IOException, InterruptedException { RedisProcess redis1 = redisTestMultilockInstance();