|
|
|
@ -384,17 +384,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
|
|
|
|
|
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
clientsFuture.onComplete((res, e) -> {
|
|
|
|
|
// interface has been deregistered
|
|
|
|
|
if (!remoteMap.containsKey(remoteInterface)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (e != null) {
|
|
|
|
|
if (e instanceof RedissonShutdownException) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
log.error("Can't send response: " + responseHolder.get() + " for request: " + request,
|
|
|
|
|
e);
|
|
|
|
|
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resubscribe(remoteInterface, requestQueue, executor);
|
|
|
|
@ -406,7 +400,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
|
|
|
|
|
|
|
|
|
|
private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
|
|
|
|
|
ExecutorService executor) {
|
|
|
|
|
if (remoteMap.get(remoteInterface).getCounter().getAndIncrement() == 0) {
|
|
|
|
|
Entry entry = remoteMap.get(remoteInterface);
|
|
|
|
|
if (entry != null && entry.getCounter().getAndIncrement() == 0) {
|
|
|
|
|
// re-subscribe anyways after the method invocation
|
|
|
|
|
subscribe(remoteInterface, requestQueue, executor);
|
|
|
|
|
}
|
|
|
|
|