From b4cc6f994149a5072a18ed94b65a7a99ec4fe76a Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 14 Dec 2017 18:03:45 +0300 Subject: [PATCH 1/3] Fixed - RedissonRedLock couldn't be locked in some cases. #1175 --- .../java/org/redisson/RedissonMultiLock.java | 20 +++++++++++------ .../org/redisson/RedissonRedLockTest.java | 22 ++++++++++++------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 32b980ab2..9b71ce63d 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -75,18 +75,19 @@ public class RedissonMultiLock implements Lock { } public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { + long baseWaitTime = locks.size() * 1500; long waitTime = -1; if (leaseTime == -1) { - waitTime = 5; - unit = TimeUnit.SECONDS; + waitTime = baseWaitTime; + unit = TimeUnit.MILLISECONDS; } else { waitTime = unit.toMillis(leaseTime); if (waitTime <= 2000) { waitTime = 2000; - } else if (waitTime <= 5000) { + } else if (waitTime <= baseWaitTime) { waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { - waitTime = ThreadLocalRandom.current().nextLong(5000, waitTime); + waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS); } @@ -131,7 +132,7 @@ public class RedissonMultiLock implements Lock { public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { - newLeaseTime = waitTime*2; + newLeaseTime = unit.toMillis(waitTime)*2; } long time = System.currentTimeMillis(); @@ -139,6 +140,8 @@ public class RedissonMultiLock implements Lock { if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } + long lockWaitTime = calcLockWaitTime(remainTime); + int failedLocksLimit = failedLocksLimit(); List lockedLocks = new ArrayList(locks.size()); for (ListIterator iterator = locks.listIterator(); iterator.hasNext();) { @@ -148,8 +151,8 @@ public class RedissonMultiLock implements Lock { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { - long awaitTime = unit.convert(remainTime, TimeUnit.MILLISECONDS); - lockAcquired = lock.tryLock(awaitTime, newLeaseTime, unit); + long awaitTime = Math.min(lockWaitTime, remainTime); + lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (Exception e) { lockAcquired = false; @@ -203,6 +206,9 @@ public class RedissonMultiLock implements Lock { return true; } + protected long calcLockWaitTime(long remainTime) { + return remainTime; + } @Override public void unlock() { diff --git a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java index 8c15c20c0..4edc65c77 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java @@ -170,9 +170,21 @@ public class RedissonRedLockTest { RLock lock2 = client1.getLock("lock2"); RLock lock3 = client2.getLock("lock3"); + testLock(lock1, lock2, lock3, lock1); + testLock(lock1, lock2, lock3, lock2); + testLock(lock1, lock2, lock3, lock3); + + client1.shutdown(); + client2.shutdown(); + + assertThat(redis1.stop()).isEqualTo(0); + assertThat(redis2.stop()).isEqualTo(0); + } + + protected void testLock(RLock lock1, RLock lock2, RLock lock3, RLock lockFirst) throws InterruptedException { Thread t1 = new Thread() { public void run() { - lock3.lock(); + lockFirst.lock(); }; }; t1.start(); @@ -193,17 +205,11 @@ public class RedissonRedLockTest { t.start(); t.join(1000); - lock3.delete(); + lockFirst.delete(); RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); lock.lock(); lock.unlock(); - - client1.shutdown(); - client2.shutdown(); - - assertThat(redis1.stop()).isEqualTo(0); - assertThat(redis2.stop()).isEqualTo(0); } From 3c63cbbd42feb3e4735b9b82326afea8211cb154 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 15 Dec 2017 13:05:47 +0300 Subject: [PATCH 2/3] Promises with listeners should not be removed during weak reference collection process. #1185 --- .../org/redisson/RedissonExecutorService.java | 9 ++++++-- .../RedissonExecutorFutureReference.java | 11 ++++++++-- .../org/redisson/misc/PromiseDelegator.java | 5 +++++ .../main/java/org/redisson/misc/RPromise.java | 3 ++- .../org/redisson/misc/RedissonPromise.java | 22 +++++++++++++++++++ 5 files changed, 45 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index c2c3a8cff..b46e65656 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -723,9 +723,14 @@ public class RedissonExecutorService implements RScheduledExecutorService { break; } references.remove(r); - cancelResponseHandling(r.getRequestId()); + + if (!r.getPromise().hasListeners()) { + cancelResponseHandling(r.getRequestId()); + } } - RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue); + + RPromise promise = ((PromiseDelegator) future).getInnerPromise(); + RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue, promise); references.add(reference); } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java index 622f34ead..8b6db0e21 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java @@ -19,6 +19,7 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import org.redisson.api.RExecutorFuture; +import org.redisson.misc.RPromise; import org.redisson.remote.RequestId; /** @@ -28,11 +29,17 @@ import org.redisson.remote.RequestId; */ public class RedissonExecutorFutureReference extends WeakReference> { - private RequestId requestId; + private final RPromise promise; + private final RequestId requestId; - public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture referent, ReferenceQueue> q) { + public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture referent, ReferenceQueue> q, RPromise promise) { super(referent, q); this.requestId = requestId; + this.promise = promise; + } + + public RPromise getPromise() { + return promise; } public RequestId getRequestId() { diff --git a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java index 92efb057f..d2a2bf5a9 100644 --- a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java +++ b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java @@ -148,6 +148,11 @@ public class PromiseDelegator implements RPromise { public boolean cancel(boolean mayInterruptIfRunning) { return promise.cancel(mayInterruptIfRunning); } + + @Override + public boolean hasListeners() { + return promise.hasListeners(); + } } diff --git a/redisson/src/main/java/org/redisson/misc/RPromise.java b/redisson/src/main/java/org/redisson/misc/RPromise.java index ec52b8086..1b8e4373e 100644 --- a/redisson/src/main/java/org/redisson/misc/RPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RPromise.java @@ -18,7 +18,6 @@ package org.redisson.misc; import org.redisson.api.RFuture; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; /** * @@ -82,4 +81,6 @@ public interface RPromise extends RFuture { @Override RPromise syncUninterruptibly(); + boolean hasListeners(); + } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java index 2f28b5532..69fe93c9b 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java @@ -15,12 +15,14 @@ */ package org.redisson.misc; +import java.lang.reflect.Field; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.redisson.api.RFuture; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; @@ -33,6 +35,17 @@ import io.netty.util.concurrent.Promise; */ public class RedissonPromise implements RPromise { + private static final Field listenersField; + + static { + try { + listenersField = DefaultPromise.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + private final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); public RedissonPromise() { @@ -172,5 +185,14 @@ public class RedissonPromise implements RPromise { public boolean cancel(boolean mayInterruptIfRunning) { return promise.cancel(mayInterruptIfRunning); } + + @Override + public boolean hasListeners() { + try { + return listenersField.get(promise) != null; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } } From 27ade052942420f7374e5a8fe1a7e7c95e338ae7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 15 Dec 2017 13:06:35 +0300 Subject: [PATCH 3/3] RedissonRedLock couldn't be locked in some cases. #1175 --- redisson/src/main/java/org/redisson/RedissonRedLock.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/redisson/src/main/java/org/redisson/RedissonRedLock.java b/redisson/src/main/java/org/redisson/RedissonRedLock.java index 4b6aa4689..c8b1ba436 100644 --- a/redisson/src/main/java/org/redisson/RedissonRedLock.java +++ b/redisson/src/main/java/org/redisson/RedissonRedLock.java @@ -49,6 +49,11 @@ public class RedissonRedLock extends RedissonMultiLock { return locks.size()/2 + 1; } + @Override + protected long calcLockWaitTime(long remainTime) { + return Math.max(remainTime / locks.size(), 1000); + } + @Override public void unlock() { unlockInner(locks);