From 5274e762b59811997147b3b42a11fb20418c1306 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 13 Apr 2016 14:40:37 +0300 Subject: [PATCH] RedissonMultiLock deadlock fixed. #467 --- src/main/java/org/redisson/RedissonLock.java | 29 +++++-- .../org/redisson/core/RedissonMultiLock.java | 85 +++++++++++-------- .../org/redisson/RedissonMultiLockTest.java | 35 ++++++++ 3 files changed, 110 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index eaf060496..96ad9ac74 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -377,6 +377,11 @@ public class RedissonLock extends RedissonExpirable implements RLock { } public Future unlockAsync() { + long threadId = Thread.currentThread().getId(); + return unlockAsync(threadId); + } + + public Future unlockAsync(final long threadId) { final Promise result = newPromise(); Future future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + @@ -396,7 +401,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { "return 1; "+ "end; " + "return nil;", - Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); + Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener() { @Override @@ -409,7 +414,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { Boolean opStatus = future.getNow(); if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " - + id + " thread-id: " + Thread.currentThread().getId()); + + id + " thread-id: " + threadId); result.setFailure(cause); return; } @@ -428,8 +433,12 @@ public class RedissonLock extends RedissonExpirable implements RLock { } public Future lockAsync(final long leaseTime, final TimeUnit unit) { - final Promise result = newPromise(); final long currentThreadId = Thread.currentThread().getId(); + return lockAsync(leaseTime, unit, currentThreadId); + } + + public Future lockAsync(final long leaseTime, final TimeUnit unit, final long currentThreadId) { + final Promise result = newPromise(); Future ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId); ttlFuture.addListener(new FutureListener() { @Override @@ -525,8 +534,13 @@ public class RedissonLock extends RedissonExpirable implements RLock { } public Future tryLockAsync() { + long threadId = Thread.currentThread().getId(); + return tryLockAsync(threadId); + } + + public Future tryLockAsync(long threadId) { final Promise result = newPromise(); - Future future = tryLockInnerAsync(Thread.currentThread().getId()); + Future future = tryLockInnerAsync(threadId); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -546,10 +560,15 @@ public class RedissonLock extends RedissonExpirable implements RLock { } public Future tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit) { + final long currentThreadId = Thread.currentThread().getId(); + return tryLockAsync(waitTime, leaseTime, unit, currentThreadId); + } + + public Future tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit, + final long currentThreadId) { final Promise result = newPromise(); final AtomicLong time = new AtomicLong(unit.toMillis(waitTime)); - final long currentThreadId = Thread.currentThread().getId(); Future ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId); ttlFuture.addListener(new FutureListener() { @Override diff --git a/src/main/java/org/redisson/core/RedissonMultiLock.java b/src/main/java/org/redisson/core/RedissonMultiLock.java index 83ee1a785..9da348738 100644 --- a/src/main/java/org/redisson/core/RedissonMultiLock.java +++ b/src/main/java/org/redisson/core/RedissonMultiLock.java @@ -21,11 +21,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import org.redisson.RedissonLock; + import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -74,7 +76,8 @@ public class RedissonMultiLock implements Lock { public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); - lock(promise, 0, leaseTime, unit); + long currentThreadId = Thread.currentThread().getId(); + lock(promise, 0, leaseTime, unit, locks, currentThreadId); promise.sync(); } @@ -84,50 +87,62 @@ public class RedissonMultiLock implements Lock { lockInterruptibly(-1, null); } - private void lock(final Promise promise, final long waitTime, final long leaseTime, final TimeUnit unit) throws InterruptedException { + private void lock(final Promise promise, final long waitTime, final long leaseTime, final TimeUnit unit, final List locks, final long currentThreadId) throws InterruptedException { final AtomicInteger tryLockRequestsAmount = new AtomicInteger(); final Map, RLock> tryLockFutures = new HashMap, RLock>(locks.size()); FutureListener listener = new FutureListener() { - AtomicBoolean unlock = new AtomicBoolean(); + AtomicReference lockedLockHolder = new AtomicReference(); + AtomicReference failed = new AtomicReference(); @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(final Future future) throws Exception { if (!future.isSuccess()) { - // unlock once - if (unlock.compareAndSet(false, true)) { - for (RLock lock : locks) { - lock.unlockAsync(); - } - - promise.setFailure(future.cause()); - } - return; + failed.compareAndSet(null, future.cause()); } Boolean res = future.getNow(); - // unlock once - if (!res && unlock.compareAndSet(false, true)) { - for (RLock lock : locks) { - lock.unlockAsync(); + if (res != null && !res) { + RLock lock = tryLockFutures.get(future); + lockedLockHolder.compareAndSet(null, lock); + } + + if (tryLockRequestsAmount.decrementAndGet() == 0) { + if (lockedLockHolder.get() == null && failed.get() == null) { + promise.setSuccess(null); + return; } - RLock lock = tryLockFutures.get(future); - lock.lockAsync().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.setFailure(future.cause()); - return; + tryLockRequestsAmount.set(tryLockFutures.size()); + for (RLock lock : tryLockFutures.values()) { + lock.unlockAsync().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (tryLockRequestsAmount.decrementAndGet() == 0) { + if (failed.get() != null) { + promise.setFailure(failed.get()); + } + if (lockedLockHolder.get() != null) { + final RedissonLock lockedLock = (RedissonLock) lockedLockHolder.get(); + lockedLock.lockAsync(leaseTime, unit, currentThreadId).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + return; + } + + List newLocks = new ArrayList(tryLockFutures.values()); + newLocks.remove(lockedLock); + lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId); + } + }); + } + } } - - lock(promise, waitTime, leaseTime, unit); - } - }); - } - if (!unlock.get() && tryLockRequestsAmount.decrementAndGet() == 0) { - promise.setSuccess(null); + }); + } } } }; @@ -138,11 +153,13 @@ public class RedissonMultiLock implements Lock { } tryLockRequestsAmount.incrementAndGet(); + Future future; if (waitTime > 0 || leaseTime > 0) { - tryLockFutures.put(lock.tryLockAsync(waitTime, leaseTime, unit), lock); + future = ((RedissonLock)lock).tryLockAsync(waitTime, leaseTime, unit, currentThreadId); } else { - tryLockFutures.put(lock.tryLockAsync(), lock); + future = ((RedissonLock)lock).tryLockAsync(currentThreadId); } + tryLockFutures.put(future, lock); } for (Future future : tryLockFutures.keySet()) { diff --git a/src/test/java/org/redisson/RedissonMultiLockTest.java b/src/test/java/org/redisson/RedissonMultiLockTest.java index 9b5446a80..9d8c1399a 100644 --- a/src/test/java/org/redisson/RedissonMultiLockTest.java +++ b/src/test/java/org/redisson/RedissonMultiLockTest.java @@ -16,6 +16,41 @@ import org.redisson.RedisRunner.RedisProcess; public class RedissonMultiLockTest { + @Test + public void testMultiThreads() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance1(); + + Config config1 = new Config(); + config1.useSingleServer().setAddress("127.0.0.1:6320"); + RedissonClient client = Redisson.create(config1); + + RLock lock1 = client.getLock("lock1"); + RLock lock2 = client.getLock("lock2"); + RLock lock3 = client.getLock("lock3"); + + Thread t = new Thread() { + public void run() { + RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); + lock.lock(); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + + lock.unlock(); + }; + }; + t.start(); + t.join(1000); + + RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); + lock.lock(); + lock.unlock(); + + assertThat(redis1.stop()).isEqualTo(0); + } + @Test public void test() throws IOException, InterruptedException { RedisProcess redis1 = redisTestMultilockInstance1();