diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index e77f7df3f..17df31b21 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -18,25 +18,18 @@ package org.redisson; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.ListIterator; -import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.redisson.api.RFuture; import org.redisson.api.RLock; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; /** * Groups multiple independent locks and manages them as one lock. @@ -78,119 +71,23 @@ public class RedissonMultiLock implements Lock { } } - public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { - RPromise promise = new RedissonPromise(); - - long currentThreadId = Thread.currentThread().getId(); - Queue lockedLocks = new ConcurrentLinkedQueue(); - lock(promise, 0, leaseTime, unit, locks, currentThreadId, lockedLocks); - - promise.sync(); - } - @Override public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); } - private void lock(final RPromise promise, final long waitTime, final long leaseTime, final TimeUnit unit, - final List locks, final long currentThreadId, final Queue lockedLocks) throws InterruptedException { - final AtomicInteger tryLockRequestsAmount = new AtomicInteger(); - final Map, RLock> tryLockFutures = new HashMap, RLock>(locks.size()); - - FutureListener listener = new FutureListener() { - - AtomicReference lockedLockHolder = new AtomicReference(); - AtomicReference failed = new AtomicReference(); - - @Override - public void operationComplete(final Future future) throws Exception { - if (isLockFailed(future)) { - failed.compareAndSet(null, future.cause()); - } - - Boolean res = future.getNow(); - if (res != null) { - RLock lock = tryLockFutures.get(future); - if (res) { - lockedLocks.add(lock); - } else { - lockedLockHolder.compareAndSet(null, lock); - } - } - - if (tryLockRequestsAmount.decrementAndGet() == 0) { - if (isAllLocksAcquired(lockedLockHolder, failed, lockedLocks)) { - if (!promise.trySuccess(null)) { - unlockInner(lockedLocks); - } - return; - } - - if (lockedLocks.isEmpty()) { - tryLockAgain(promise, waitTime, leaseTime, unit, currentThreadId, tryLockFutures); - return; - } - - final AtomicInteger locksToUnlockAmount = new AtomicInteger(lockedLocks.size()); - for (RLock lock : lockedLocks) { - lock.unlockAsync().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (locksToUnlockAmount.decrementAndGet() == 0) { - tryLockAgain(promise, waitTime, leaseTime, unit, currentThreadId, tryLockFutures); - } - } - }); - } - } - } - - protected void tryLockAgain(final RPromise promise, final long waitTime, final long leaseTime, - final TimeUnit unit, final long currentThreadId, final Map, RLock> tryLockFutures) throws InterruptedException { - lockedLocks.clear(); - if (failed.get() != null) { - promise.tryFailure(failed.get()); - } else if (lockedLockHolder.get() != null) { - final RedissonLock lockedLock = (RedissonLock) lockedLockHolder.get(); - lockedLock.lockAsync(leaseTime, unit, currentThreadId).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - lockedLocks.add(lockedLock); - List newLocks = new ArrayList(tryLockFutures.values()); - newLocks.remove(lockedLock); - lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId, lockedLocks); - } - }); - } else { - lock(promise, waitTime, leaseTime, unit, locks, currentThreadId, lockedLocks); - } - } - }; - - for (RLock lock : locks) { - tryLockRequestsAmount.incrementAndGet(); - Future future; - if (waitTime > 0 || leaseTime > 0) { - future = ((RedissonPromise)((RedissonLock)lock).tryLockAsync(waitTime, leaseTime, unit, currentThreadId)).getInnerPromise(); - } else { - future = ((RedissonPromise)(((RedissonLock)lock).tryLockAsync(currentThreadId))).getInnerPromise(); - } - - if (future instanceof RedissonPromise) { - future = ((RedissonPromise)future).getInnerPromise(); - } - - tryLockFutures.put(future, lock); + public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { + long waitTime = -1; + if (leaseTime == -1) { + waitTime = 5; + unit = TimeUnit.SECONDS; + } else { + waitTime = unit.convert(5, TimeUnit.SECONDS); } - - for (Future future : tryLockFutures.keySet()) { - future.addListener(listener); + while (true) { + if (tryLock(waitTime, leaseTime, unit)) { + return; + } } } @@ -223,7 +120,7 @@ public class RedissonMultiLock implements Lock { protected int failedLocksLimit() { return 0; } - + public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { @@ -240,15 +137,23 @@ public class RedissonMultiLock implements Lock { for (ListIterator iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; - if (waitTime == -1 && leaseTime == -1) { - lockAcquired = lock.tryLock(); - } else { - lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit); + try { + if (waitTime == -1 && leaseTime == -1) { + lockAcquired = lock.tryLock(); + } else { + lockAcquired = lock.tryLock(unit.convert(remainTime, TimeUnit.MILLISECONDS), newLeaseTime, unit); + } + } catch (Exception e) { + lockAcquired = false; } if (lockAcquired) { lockedLocks.add(lock); } else { + if (locks.size() - lockedLocks.size() == failedLocksLimit()) { + break; + } + if (failedLocksLimit == 0) { unlockInner(lockedLocks); if (waitTime == -1 && leaseTime == -1) { @@ -256,6 +161,10 @@ public class RedissonMultiLock implements Lock { } failedLocksLimit = failedLocksLimit(); lockedLocks.clear(); + // reset iterator + while (iterator.hasPrevious()) { + iterator.previous(); + } } else { failedLocksLimit--; } @@ -264,7 +173,7 @@ public class RedissonMultiLock implements Lock { if (remainTime != -1) { remainTime -= (System.currentTimeMillis() - time); time = System.currentTimeMillis(); - if (remainTime < 0) { + if (remainTime <= 0) { unlockInner(lockedLocks); return false; } diff --git a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java index 0e29d6c55..9c7d8d612 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java @@ -22,6 +22,51 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonRedLockTest { + @Test + public void testLockLeasetime() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(); + RedisProcess redis2 = redisTestMultilockInstance(); + + RedissonClient client1 = createClient(redis1.getRedisServerAddressAndPort()); + RedissonClient client2 = createClient(redis2.getRedisServerAddressAndPort()); + + RLock lock1 = client1.getLock("lock1"); + RLock lock2 = client1.getLock("lock2"); + RLock lock3 = client2.getLock("lock3"); + RLock lock4 = client2.getLock("lock3"); + RLock lock5 = client2.getLock("lock3"); + RLock lock6 = client2.getLock("lock3"); + RLock lock7 = client2.getLock("lock3"); + + + RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3, lock4, lock5, lock6, lock7); + + ExecutorService executor = Executors.newFixedThreadPool(10); + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 10; i++) { + executor.submit(() -> { + for (int j = 0; j < 5; j++) { + try { + lock.lock(2, TimeUnit.SECONDS); + int nextValue = counter.get() + 1; + Thread.sleep(1000); + counter.set(nextValue); + lock.unlock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(2, TimeUnit.MINUTES)).isTrue(); + assertThat(counter.get()).isEqualTo(50); + + assertThat(redis1.stop()).isEqualTo(0); + assertThat(redis2.stop()).isEqualTo(0); + } + @Test public void testTryLockLeasetime() throws IOException, InterruptedException { RedisProcess redis1 = redisTestMultilockInstance();