refactoring

pull/4239/head
Nikita Koksharov
parent 285ec8f1ce
commit 59ff8e2245

@ -100,7 +100,7 @@ public class RedissonLock extends RedissonBaseLock {
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
Timeout t = pubSub.timeout(future);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
@ -108,8 +108,6 @@ public class RedissonLock extends RedissonBaseLock {
entry = commandExecutor.get(future);
}
t.cancel();
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
@ -393,14 +391,13 @@ public class RedissonLock extends RedissonBaseLock {
}
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
Timeout t = pubSub.timeout(subscribeFuture);
pubSub.timeout(subscribeFuture);
subscribeFuture.whenComplete((res, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
t.cancel();
lockAsync(leaseTime, unit, res, result, currentThreadId);
});
});

@ -88,9 +88,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
CompletableFuture<RedissonLockEntry> future = subscribe();
Timeout t = semaphorePubSub.timeout(future);
semaphorePubSub.timeout(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
t.cancel();
try {
while (true) {
Long nearestTimeout;
@ -126,9 +125,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
CompletableFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate).toCompletableFuture();
CompletableFuture<String> f = tryAcquireFuture.thenCompose(permitId -> {
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
Timeout t = semaphorePubSub.timeout(subscribeFuture);
semaphorePubSub.timeout(subscribeFuture);
return subscribeFuture.thenCompose(res -> {
t.cancel();
return acquireAsync(permits, res, ttl, timeUnit);
});
});

@ -80,9 +80,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
CompletableFuture<RedissonLockEntry> future = subscribe();
Timeout t = semaphorePubSub.timeout(future);
semaphorePubSub.timeout(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
t.cancel();
try {
while (true) {
if (tryAcquire(permits)) {
@ -120,14 +119,13 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
Timeout t = semaphorePubSub.timeout(subscribeFuture);
semaphorePubSub.timeout(subscribeFuture);
subscribeFuture.whenComplete((r, e1) -> {
if (e1 != null) {
result.completeExceptionally(e1);
return;
}
t.cancel();
acquireAsync(permits, r, result);
});
});
@ -377,15 +375,13 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
long current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
Timeout timeout = semaphorePubSub.timeout(subscribeFuture, time.get());
semaphorePubSub.timeout(subscribeFuture, time.get());
subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
timeout.cancel();
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);

@ -15,7 +15,6 @@
*/
package org.redisson.pubsub;
import io.netty.util.Timeout;
import org.redisson.PubSubEntry;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
@ -58,12 +57,12 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
public Timeout timeout(CompletableFuture<?> promise) {
return service.timeout(promise);
public void timeout(CompletableFuture<?> promise) {
service.timeout(promise);
}
public Timeout timeout(CompletableFuture<?> promise, long timeout) {
return service.timeout(promise, timeout);
public void timeout(CompletableFuture<?> promise, long timeout) {
service.timeout(promise, timeout);
}
public CompletableFuture<E> subscribe(String entryName, String channelName) {

@ -229,23 +229,23 @@ public class PublishSubscribeService {
subscribeNoTimeout(codec, channelName, entry, promise, type, lock, attempts, listeners);
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
Timeout lockTimeout = timeout(promise, timeout);
promise.whenComplete((e, r) -> {
lockTimeout.cancel();
});
timeout(promise, timeout);
}
public Timeout timeout(CompletableFuture<?> promise) {
public void timeout(CompletableFuture<?> promise) {
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
return timeout(promise, timeout);
timeout(promise, timeout);
}
public Timeout timeout(CompletableFuture<?> promise, long timeout) {
return connectionManager.newTimeout(t -> {
promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS);
public void timeout(CompletableFuture<?> promise, long timeout) {
Timeout task = connectionManager.newTimeout(t -> {
promise.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + timeout + "ms. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}, timeout, TimeUnit.MILLISECONDS);
promise.whenComplete((r, e) -> {
task.cancel();
});
}
private void subscribeNoTimeout(Codec codec, ChannelName channelName, MasterSlaveEntry entry,

Loading…
Cancel
Save