Fixed - RRemoteService shouldn't allocate a new worker if requestQueue is empty.

pull/6366/head
Nikita Koksharov 1 month ago
parent 9781ca642b
commit 681d0cce4e

@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
@ -44,10 +45,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
public static class Entry {
RFuture<String> future;
final AtomicInteger counter;
final AtomicInteger freeWorkers;
public Entry(int workers) {
counter = new AtomicInteger(workers);
freeWorkers = new AtomicInteger(workers);
}
public void setFuture(RFuture<String> future) {
@ -58,8 +59,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return future;
}
public AtomicInteger getCounter() {
return counter;
public AtomicInteger getFreeWorkers() {
return freeWorkers;
}
}
@ -137,7 +138,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (entry == null) {
return 0;
}
return entry.getCounter().get();
return entry.getFreeWorkers().get();
}
@Override
@ -235,7 +236,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return;
}
log.debug("subscribe: {}, entry counter: {}", remoteInterface, entry.getCounter());
log.debug("subscribe: {}, free workers: {}", remoteInterface, entry.getFreeWorkers());
RFuture<String> take = requestQueue.pollAsync(60, TimeUnit.SECONDS);
entry.setFuture(take);
@ -258,15 +259,12 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
return;
}
// do not subscribe now, see
// https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue);
if (entry.getCounter().get() == 0) {
if (entry.getFreeWorkers().get() == 0) {
return;
}
if (entry.getCounter().decrementAndGet() > 0) {
int freeWorkers = entry.getFreeWorkers().decrementAndGet();
if (freeWorkers > 0 && requestId != null) {
subscribe(remoteInterface, requestQueue, executor, bean);
}
@ -416,6 +414,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (request.getOptions().getExecutionTimeoutInMillis() != null) {
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
long tt = timeout;
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
try {
@ -427,8 +426,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
} else {
response = result;
}
RFuture<Void> clientsFuture = queue.putAsync(response);
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
CompletionStage<?> clientsFuture = queue.putAsync(response)
.thenCompose(s -> queue.expireAsync(Duration.ofMillis(tt)));
clientsFuture.whenComplete((res, exc) -> {
if (exc != null) {
@ -507,7 +507,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.debug("resubscribe: {}, queue: {}", remoteInterface, requestQueue.getName());
if (entry != null && entry.getCounter().getAndIncrement() == 0) {
if (entry != null && entry.getFreeWorkers().getAndIncrement() == 0) {
// re-subscribe anyways after the method invocation
subscribe(remoteInterface, requestQueue, executor, bean);
}

Loading…
Cancel
Save