diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index f8f5c7557..21fea8e0a 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Optional; @@ -44,10 +45,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS public static class Entry { RFuture future; - final AtomicInteger counter; + final AtomicInteger freeWorkers; public Entry(int workers) { - counter = new AtomicInteger(workers); + freeWorkers = new AtomicInteger(workers); } public void setFuture(RFuture future) { @@ -58,8 +59,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return future; } - public AtomicInteger getCounter() { - return counter; + public AtomicInteger getFreeWorkers() { + return freeWorkers; } } @@ -137,7 +138,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (entry == null) { return 0; } - return entry.getCounter().get(); + return entry.getFreeWorkers().get(); } @Override @@ -235,7 +236,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - log.debug("subscribe: {}, entry counter: {}", remoteInterface, entry.getCounter()); + log.debug("subscribe: {}, free workers: {}", remoteInterface, entry.getFreeWorkers()); RFuture take = requestQueue.pollAsync(60, TimeUnit.SECONDS); entry.setFuture(take); @@ -258,15 +259,12 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - // do not subscribe now, see - // https://github.com/mrniko/redisson/issues/493 - // subscribe(remoteInterface, requestQueue); - - if (entry.getCounter().get() == 0) { + if (entry.getFreeWorkers().get() == 0) { return; } - - if (entry.getCounter().decrementAndGet() > 0) { + + int freeWorkers = entry.getFreeWorkers().decrementAndGet(); + if (freeWorkers > 0 && requestId != null) { subscribe(remoteInterface, requestQueue, executor, bean); } @@ -416,6 +414,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (request.getOptions().getExecutionTimeoutInMillis() != null) { timeout = request.getOptions().getExecutionTimeoutInMillis(); } + long tt = timeout; RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); try { @@ -427,8 +426,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } else { response = result; } - RFuture clientsFuture = queue.putAsync(response); - queue.expireAsync(timeout, TimeUnit.MILLISECONDS); + + CompletionStage clientsFuture = queue.putAsync(response) + .thenCompose(s -> queue.expireAsync(Duration.ofMillis(tt))); clientsFuture.whenComplete((res, exc) -> { if (exc != null) { @@ -507,7 +507,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.debug("resubscribe: {}, queue: {}", remoteInterface, requestQueue.getName()); - if (entry != null && entry.getCounter().getAndIncrement() == 0) { + if (entry != null && entry.getFreeWorkers().getAndIncrement() == 0) { // re-subscribe anyways after the method invocation subscribe(remoteInterface, requestQueue, executor, bean); }