diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index c2c3a8cff..b46e65656 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -723,9 +723,14 @@ public class RedissonExecutorService implements RScheduledExecutorService { break; } references.remove(r); - cancelResponseHandling(r.getRequestId()); + + if (!r.getPromise().hasListeners()) { + cancelResponseHandling(r.getRequestId()); + } } - RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue); + + RPromise promise = ((PromiseDelegator) future).getInnerPromise(); + RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue, promise); references.add(reference); } diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 32b980ab2..9b71ce63d 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -75,18 +75,19 @@ public class RedissonMultiLock implements Lock { } public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { + long baseWaitTime = locks.size() * 1500; long waitTime = -1; if (leaseTime == -1) { - waitTime = 5; - unit = TimeUnit.SECONDS; + waitTime = baseWaitTime; + unit = TimeUnit.MILLISECONDS; } else { waitTime = unit.toMillis(leaseTime); if (waitTime <= 2000) { waitTime = 2000; - } else if (waitTime <= 5000) { + } else if (waitTime <= baseWaitTime) { waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { - waitTime = ThreadLocalRandom.current().nextLong(5000, waitTime); + waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS); } @@ -131,7 +132,7 @@ public class RedissonMultiLock implements Lock { public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { - newLeaseTime = waitTime*2; + newLeaseTime = unit.toMillis(waitTime)*2; } long time = System.currentTimeMillis(); @@ -139,6 +140,8 @@ public class RedissonMultiLock implements Lock { if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } + long lockWaitTime = calcLockWaitTime(remainTime); + int failedLocksLimit = failedLocksLimit(); List lockedLocks = new ArrayList(locks.size()); for (ListIterator iterator = locks.listIterator(); iterator.hasNext();) { @@ -148,8 +151,8 @@ public class RedissonMultiLock implements Lock { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { - long awaitTime = unit.convert(remainTime, TimeUnit.MILLISECONDS); - lockAcquired = lock.tryLock(awaitTime, newLeaseTime, unit); + long awaitTime = Math.min(lockWaitTime, remainTime); + lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (Exception e) { lockAcquired = false; @@ -203,6 +206,9 @@ public class RedissonMultiLock implements Lock { return true; } + protected long calcLockWaitTime(long remainTime) { + return remainTime; + } @Override public void unlock() { diff --git a/redisson/src/main/java/org/redisson/RedissonRedLock.java b/redisson/src/main/java/org/redisson/RedissonRedLock.java index 4b6aa4689..c8b1ba436 100644 --- a/redisson/src/main/java/org/redisson/RedissonRedLock.java +++ b/redisson/src/main/java/org/redisson/RedissonRedLock.java @@ -49,6 +49,11 @@ public class RedissonRedLock extends RedissonMultiLock { return locks.size()/2 + 1; } + @Override + protected long calcLockWaitTime(long remainTime) { + return Math.max(remainTime / locks.size(), 1000); + } + @Override public void unlock() { unlockInner(locks); diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java index 622f34ead..8b6db0e21 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java @@ -19,6 +19,7 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import org.redisson.api.RExecutorFuture; +import org.redisson.misc.RPromise; import org.redisson.remote.RequestId; /** @@ -28,11 +29,17 @@ import org.redisson.remote.RequestId; */ public class RedissonExecutorFutureReference extends WeakReference> { - private RequestId requestId; + private final RPromise promise; + private final RequestId requestId; - public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture referent, ReferenceQueue> q) { + public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture referent, ReferenceQueue> q, RPromise promise) { super(referent, q); this.requestId = requestId; + this.promise = promise; + } + + public RPromise getPromise() { + return promise; } public RequestId getRequestId() { diff --git a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java index b18e6a256..1e28a398a 100644 --- a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java +++ b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java @@ -157,6 +157,11 @@ public class PromiseDelegator implements RPromise { return promise.thenAcceptAsync(action); } + @Override + public boolean hasListeners() { + return promise.hasListeners(); + } + public CompletionStage thenAcceptAsync(Consumer action, Executor executor) { return promise.thenAcceptAsync(action, executor); } diff --git a/redisson/src/main/java/org/redisson/misc/RPromise.java b/redisson/src/main/java/org/redisson/misc/RPromise.java index ec52b8086..1b8e4373e 100644 --- a/redisson/src/main/java/org/redisson/misc/RPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RPromise.java @@ -18,7 +18,6 @@ package org.redisson.misc; import org.redisson.api.RFuture; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; /** * @@ -82,4 +81,6 @@ public interface RPromise extends RFuture { @Override RPromise syncUninterruptibly(); + boolean hasListeners(); + } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java index c2b966129..f3efada95 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java @@ -19,12 +19,15 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.lang.reflect.Field; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.RFuture; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; @@ -44,6 +47,17 @@ public class RedissonPromise extends CompletableFuture implements RPromise private final int FAILED = 2; private final int CANCELED = 3; + private static final Field listenersField; + + static { + try { + listenersField = DefaultPromise.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + private final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); private final AtomicInteger status = new AtomicInteger(); @@ -227,4 +241,13 @@ public class RedissonPromise extends CompletableFuture implements RPromise return false; } + @Override + public boolean hasListeners() { + try { + return listenersField.get(promise) != null || getNumberOfDependents() > 0; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java index 8c15c20c0..4edc65c77 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java @@ -170,9 +170,21 @@ public class RedissonRedLockTest { RLock lock2 = client1.getLock("lock2"); RLock lock3 = client2.getLock("lock3"); + testLock(lock1, lock2, lock3, lock1); + testLock(lock1, lock2, lock3, lock2); + testLock(lock1, lock2, lock3, lock3); + + client1.shutdown(); + client2.shutdown(); + + assertThat(redis1.stop()).isEqualTo(0); + assertThat(redis2.stop()).isEqualTo(0); + } + + protected void testLock(RLock lock1, RLock lock2, RLock lock3, RLock lockFirst) throws InterruptedException { Thread t1 = new Thread() { public void run() { - lock3.lock(); + lockFirst.lock(); }; }; t1.start(); @@ -193,17 +205,11 @@ public class RedissonRedLockTest { t.start(); t.join(1000); - lock3.delete(); + lockFirst.delete(); RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); lock.lock(); lock.unlock(); - - client1.shutdown(); - client2.shutdown(); - - assertThat(redis1.stop()).isEqualTo(0); - assertThat(redis2.stop()).isEqualTo(0); }