Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/misc/PromiseDelegator.java
#	redisson/src/main/java/org/redisson/misc/RedissonPromise.java
pull/1303/head
Nikita 7 years ago
commit 718206170b

@ -723,9 +723,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
break; break;
} }
references.remove(r); references.remove(r);
cancelResponseHandling(r.getRequestId());
if (!r.getPromise().hasListeners()) {
cancelResponseHandling(r.getRequestId());
}
} }
RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue);
RPromise<?> promise = ((PromiseDelegator<?>) future).getInnerPromise();
RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue, promise);
references.add(reference); references.add(reference);
} }

@ -75,18 +75,19 @@ public class RedissonMultiLock implements Lock {
} }
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long baseWaitTime = locks.size() * 1500;
long waitTime = -1; long waitTime = -1;
if (leaseTime == -1) { if (leaseTime == -1) {
waitTime = 5; waitTime = baseWaitTime;
unit = TimeUnit.SECONDS; unit = TimeUnit.MILLISECONDS;
} else { } else {
waitTime = unit.toMillis(leaseTime); waitTime = unit.toMillis(leaseTime);
if (waitTime <= 2000) { if (waitTime <= 2000) {
waitTime = 2000; waitTime = 2000;
} else if (waitTime <= 5000) { } else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else { } else {
waitTime = ThreadLocalRandom.current().nextLong(5000, waitTime); waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
} }
waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS); waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);
} }
@ -131,7 +132,7 @@ public class RedissonMultiLock implements Lock {
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1; long newLeaseTime = -1;
if (leaseTime != -1) { if (leaseTime != -1) {
newLeaseTime = waitTime*2; newLeaseTime = unit.toMillis(waitTime)*2;
} }
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
@ -139,6 +140,8 @@ public class RedissonMultiLock implements Lock {
if (waitTime != -1) { if (waitTime != -1) {
remainTime = unit.toMillis(waitTime); remainTime = unit.toMillis(waitTime);
} }
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit(); int failedLocksLimit = failedLocksLimit();
List<RLock> lockedLocks = new ArrayList<RLock>(locks.size()); List<RLock> lockedLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
@ -148,8 +151,8 @@ public class RedissonMultiLock implements Lock {
if (waitTime == -1 && leaseTime == -1) { if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock(); lockAcquired = lock.tryLock();
} else { } else {
long awaitTime = unit.convert(remainTime, TimeUnit.MILLISECONDS); long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, unit); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
} }
} catch (Exception e) { } catch (Exception e) {
lockAcquired = false; lockAcquired = false;
@ -203,6 +206,9 @@ public class RedissonMultiLock implements Lock {
return true; return true;
} }
protected long calcLockWaitTime(long remainTime) {
return remainTime;
}
@Override @Override
public void unlock() { public void unlock() {

@ -49,6 +49,11 @@ public class RedissonRedLock extends RedissonMultiLock {
return locks.size()/2 + 1; return locks.size()/2 + 1;
} }
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1000);
}
@Override @Override
public void unlock() { public void unlock() {
unlockInner(locks); unlockInner(locks);

@ -19,6 +19,7 @@ import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import org.redisson.api.RExecutorFuture; import org.redisson.api.RExecutorFuture;
import org.redisson.misc.RPromise;
import org.redisson.remote.RequestId; import org.redisson.remote.RequestId;
/** /**
@ -28,11 +29,17 @@ import org.redisson.remote.RequestId;
*/ */
public class RedissonExecutorFutureReference extends WeakReference<RExecutorFuture<?>> { public class RedissonExecutorFutureReference extends WeakReference<RExecutorFuture<?>> {
private RequestId requestId; private final RPromise<?> promise;
private final RequestId requestId;
public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q) { public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q, RPromise<?> promise) {
super(referent, q); super(referent, q);
this.requestId = requestId; this.requestId = requestId;
this.promise = promise;
}
public RPromise<?> getPromise() {
return promise;
} }
public RequestId getRequestId() { public RequestId getRequestId() {

@ -157,6 +157,11 @@ public class PromiseDelegator<T> implements RPromise<T> {
return promise.thenAcceptAsync(action); return promise.thenAcceptAsync(action);
} }
@Override
public boolean hasListeners() {
return promise.hasListeners();
}
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
return promise.thenAcceptAsync(action, executor); return promise.thenAcceptAsync(action, executor);
} }

@ -18,7 +18,6 @@ package org.redisson.misc;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/** /**
* *
@ -82,4 +81,6 @@ public interface RPromise<T> extends RFuture<T> {
@Override @Override
RPromise<T> syncUninterruptibly(); RPromise<T> syncUninterruptibly();
boolean hasListeners();
} }

@ -19,12 +19,15 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
@ -44,6 +47,17 @@ public class RedissonPromise<T> extends CompletableFuture<T> implements RPromise
private final int FAILED = 2; private final int FAILED = 2;
private final int CANCELED = 3; private final int CANCELED = 3;
private static final Field listenersField;
static {
try {
listenersField = DefaultPromise.class.getDeclaredField("listeners");
listenersField.setAccessible(true);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private final Promise<T> promise = ImmediateEventExecutor.INSTANCE.newPromise(); private final Promise<T> promise = ImmediateEventExecutor.INSTANCE.newPromise();
private final AtomicInteger status = new AtomicInteger(); private final AtomicInteger status = new AtomicInteger();
@ -227,4 +241,13 @@ public class RedissonPromise<T> extends CompletableFuture<T> implements RPromise
return false; return false;
} }
@Override
public boolean hasListeners() {
try {
return listenersField.get(promise) != null || getNumberOfDependents() > 0;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
} }

@ -170,9 +170,21 @@ public class RedissonRedLockTest {
RLock lock2 = client1.getLock("lock2"); RLock lock2 = client1.getLock("lock2");
RLock lock3 = client2.getLock("lock3"); RLock lock3 = client2.getLock("lock3");
testLock(lock1, lock2, lock3, lock1);
testLock(lock1, lock2, lock3, lock2);
testLock(lock1, lock2, lock3, lock3);
client1.shutdown();
client2.shutdown();
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
}
protected void testLock(RLock lock1, RLock lock2, RLock lock3, RLock lockFirst) throws InterruptedException {
Thread t1 = new Thread() { Thread t1 = new Thread() {
public void run() { public void run() {
lock3.lock(); lockFirst.lock();
}; };
}; };
t1.start(); t1.start();
@ -193,17 +205,11 @@ public class RedissonRedLockTest {
t.start(); t.start();
t.join(1000); t.join(1000);
lock3.delete(); lockFirst.delete();
RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock(); lock.lock();
lock.unlock(); lock.unlock();
client1.shutdown();
client2.shutdown();
assertThat(redis1.stop()).isEqualTo(0);
assertThat(redis2.stop()).isEqualTo(0);
} }

Loading…
Cancel
Save