|
|
|
@ -234,6 +234,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.debug("subscribe: {}, entry counter: {}", remoteInterface, entry.getCounter());
|
|
|
|
|
|
|
|
|
|
RFuture<String> take = requestQueue.pollAsync(60, TimeUnit.SECONDS);
|
|
|
|
|
entry.setFuture(take);
|
|
|
|
|
take.whenComplete((requestId, e) -> {
|
|
|
|
@ -450,9 +453,15 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.debug("start execution. requestId: {}, method: {}", request.getId(), method);
|
|
|
|
|
|
|
|
|
|
invokeMethod(request, method, cancelRequestFuture, responsePromise);
|
|
|
|
|
|
|
|
|
|
log.debug("end execution. requestId: {}, method: {}", request.getId(), method);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
log.debug("task submitted. requestId: {}, method: {}", request.getId(), method);
|
|
|
|
|
|
|
|
|
|
cancelRequestFuture.thenAccept(r -> {
|
|
|
|
|
boolean res = submitFuture.cancel(r.isMayInterruptIfRunning());
|
|
|
|
|
if (res) {
|
|
|
|
@ -495,6 +504,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
|
|
|
|
|
private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
|
|
|
|
|
ExecutorService executor, Object bean) {
|
|
|
|
|
Entry entry = remoteMap.get(remoteInterface);
|
|
|
|
|
|
|
|
|
|
log.debug("resubscribe: {}, queue: {}", remoteInterface, requestQueue.getName());
|
|
|
|
|
|
|
|
|
|
if (entry != null && entry.getCounter().getAndIncrement() == 0) {
|
|
|
|
|
// re-subscribe anyways after the method invocation
|
|
|
|
|
subscribe(remoteInterface, requestQueue, executor, bean);
|
|
|
|
|