Fixed - RedissonLock.tryLock method throws CancellationException. #4210

Fixed - subscription timeout calculation. #4216
pull/4239/head
Nikita Koksharov 3 years ago
parent 723928443e
commit 285ec8f1ce

@ -149,8 +149,6 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public RFuture<Boolean> awaitAsync(long waitTime, TimeUnit unit) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
long currentTime = System.currentTimeMillis();
CompletableFuture<Long> countFuture = getCountAsync().toCompletableFuture();
@ -164,6 +162,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
long current = System.currentTimeMillis();
CompletableFuture<RedissonCountDownLatchEntry> subscribeFuture = subscribe();
pubSub.timeout(subscribeFuture, time.get());
return subscribeFuture.thenCompose(entry -> {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);

@ -100,6 +100,7 @@ public class RedissonLock extends RedissonBaseLock {
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
Timeout t = pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
@ -107,6 +108,8 @@ public class RedissonLock extends RedissonBaseLock {
entry = commandExecutor.get(future);
}
t.cancel();
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
@ -229,7 +232,7 @@ public class RedissonLock extends RedissonBaseLock {
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
@ -390,12 +393,14 @@ public class RedissonLock extends RedissonBaseLock {
}
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
Timeout t = pubSub.timeout(subscribeFuture);
subscribeFuture.whenComplete((res, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
t.cancel();
lockAsync(leaseTime, unit, res, result, currentThreadId);
});
});
@ -502,6 +507,7 @@ public class RedissonLock extends RedissonBaseLock {
long current = System.currentTimeMillis();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
pubSub.timeout(subscribeFuture, time.get());
subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);

@ -88,7 +88,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
CompletableFuture<RedissonLockEntry> future = subscribe();
Timeout t = semaphorePubSub.timeout(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
t.cancel();
try {
while (true) {
Long nearestTimeout;
@ -124,7 +126,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
CompletableFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate).toCompletableFuture();
CompletableFuture<String> f = tryAcquireFuture.thenCompose(permitId -> {
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
Timeout t = semaphorePubSub.timeout(subscribeFuture);
return subscribeFuture.thenCompose(res -> {
t.cancel();
return acquireAsync(permits, res, ttl, timeUnit);
});
});

@ -80,7 +80,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
CompletableFuture<RedissonLockEntry> future = subscribe();
Timeout t = semaphorePubSub.timeout(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
t.cancel();
try {
while (true) {
if (tryAcquire(permits)) {
@ -118,12 +120,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
Timeout t = semaphorePubSub.timeout(subscribeFuture);
subscribeFuture.whenComplete((r, e1) -> {
if (e1 != null) {
result.completeExceptionally(e1);
return;
}
t.cancel();
acquireAsync(permits, r, result);
});
});
@ -372,18 +376,16 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
long current = System.currentTimeMillis();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
Timeout timeout = semaphorePubSub.timeout(subscribeFuture, time.get());
subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
if (futureRef.get() != null) {
futureRef.get().cancel();
}
timeout.cancel();
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
@ -395,18 +397,6 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
tryAcquireAsync(time, permits, r, result);
});
if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
result.complete(false);
}
}
}, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
});
return new CompletableFutureWrapper<>(result);
}

@ -20,14 +20,12 @@ import org.redisson.PubSubEntry;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
*
@ -60,19 +58,20 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
public Timeout timeout(CompletableFuture<?> promise) {
return service.timeout(promise);
}
public Timeout timeout(CompletableFuture<?> promise, long timeout) {
return service.timeout(promise, timeout);
}
public CompletableFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture<>();
int timeout = service.getConnectionManager().getConfig().getTimeout();
Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {
newPromise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS);
semaphore.acquire(() -> {
if (!lockTimeout.cancel()) {
if (newPromise.isDone()) {
semaphore.release();
return;
}
@ -109,7 +108,12 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
CompletableFuture<PubSubConnectionEntry> s = service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);
newPromise.whenComplete((r, e) -> {
if (e != null) {
s.completeExceptionally(e);
}
});
s.whenComplete((r, e) -> {
if (e != null) {
value.getPromise().completeExceptionally(e);

@ -189,11 +189,12 @@ public class PublishSubscribeService {
MasterSlaveEntry entry, RedisPubSubListener<?>... listeners) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
AsyncSemaphore lock = getSemaphore(channelName);
Timeout lockTimeout = connectionManager.newTimeout(timeout -> {
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
Timeout lockTimeout = connectionManager.newTimeout(t -> {
promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + config.getTimeout() + "ms. " +
"Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}, timeout, TimeUnit.MILLISECONDS);
lock.acquire(() -> {
if (!lockTimeout.cancel() || promise.isDone()) {
lock.release();
@ -205,10 +206,10 @@ public class PublishSubscribeService {
return promise;
}
public CompletableFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName,
public CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String channelName,
AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
subscribe(codec, new ChannelName(channelName), getEntry(new ChannelName(channelName)), promise,
subscribeNoTimeout(codec, new ChannelName(channelName), getEntry(new ChannelName(channelName)), promise,
PubSubType.SUBSCRIBE, semaphore, new AtomicInteger(), listeners);
return promise;
}
@ -223,6 +224,31 @@ public class PublishSubscribeService {
}
private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry,
CompletableFuture<PubSubConnectionEntry> promise, PubSubType type,
AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?>... listeners) {
subscribeNoTimeout(codec, channelName, entry, promise, type, lock, attempts, listeners);
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
Timeout lockTimeout = timeout(promise, timeout);
promise.whenComplete((e, r) -> {
lockTimeout.cancel();
});
}
public Timeout timeout(CompletableFuture<?> promise) {
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
return timeout(promise, timeout);
}
public Timeout timeout(CompletableFuture<?> promise, long timeout) {
return connectionManager.newTimeout(t -> {
promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS);
}
private void subscribeNoTimeout(Codec codec, ChannelName channelName, MasterSlaveEntry entry,
CompletableFuture<PubSubConnectionEntry> promise, PubSubType type,
AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener<?>... listeners) {
PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
@ -231,14 +257,8 @@ public class PublishSubscribeService {
return;
}
Timeout lockTimeout = connectionManager.newTimeout(timeout -> {
promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + config.getTimeout() + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, config.getTimeout(), TimeUnit.MILLISECONDS);
freePubSubLock.acquire(() -> {
if (!lockTimeout.cancel() || promise.isDone()) {
if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return;
@ -305,7 +325,7 @@ public class PublishSubscribeService {
return;
}
connectionManager.newTimeout(timeout -> {
connectionManager.newTimeout(t -> {
if (subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Subscription timeout after " + config.getTimeout() + "ms. " +
"Check network and/or increase 'timeout' parameter."))) {

Loading…
Cancel
Save