|
|
|
@ -380,19 +380,23 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
|
|
|
|
|
RFuture<Void> clientsFuture = queue.putAsync(responseHolder.get());
|
|
|
|
|
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
|
|
|
|
|
try {
|
|
|
|
|
RFuture<Void> clientsFuture = queue.putAsync(responseHolder.get());
|
|
|
|
|
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
clientsFuture.onComplete((res, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
if (e instanceof RedissonShutdownException) {
|
|
|
|
|
return;
|
|
|
|
|
clientsFuture.onComplete((res, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
if (e instanceof RedissonShutdownException) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
|
|
|
|
|
}
|
|
|
|
|
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resubscribe(remoteInterface, requestQueue, executor);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
resubscribe(remoteInterface, requestQueue, executor);
|
|
|
|
|
});
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
resubscribe(remoteInterface, requestQueue, executor);
|
|
|
|
|
}
|
|
|
|
|