refactoring

pull/6394/head
Nikita Koksharov
parent f3cddf4beb
commit 4f5f7be3da

@ -100,12 +100,14 @@ public abstract class BaseRemoteProxy {
addCancelHandling(requestId, responseFuture); addCancelHandling(requestId, responseFuture);
Timeout responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture); Result res = new Result(responseFuture);
Timeout responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture, res);
res.setResponseTimeoutFuture(responseTimeoutFuture);
Map<String, List<Result>> entryResponses = entry.getResponses(); Map<String, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3)); List<Result> list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3));
Result res = new Result(responseFuture, responseTimeoutFuture);
if (insertFirst) { if (insertFirst) {
list.add(0, res); list.add(0, res);
} else { } else {
@ -121,7 +123,8 @@ public abstract class BaseRemoteProxy {
return responseFuture; return responseFuture;
} }
private <T extends RRemoteServiceResponse> Timeout createResponseTimeout(long timeout, String requestId, CompletableFuture<T> responseFuture) { private <T extends RRemoteServiceResponse> Timeout createResponseTimeout(long timeout, String requestId,
CompletableFuture<T> responseFuture, Result res) {
return commandExecutor.getServiceManager().newTimeout(t -> { return commandExecutor.getServiceManager().newTimeout(t -> {
responses.computeIfPresent(responseQueueName, (k, entry) -> { responses.computeIfPresent(responseQueueName, (k, entry) -> {
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
@ -130,7 +133,7 @@ public abstract class BaseRemoteProxy {
} }
List<Result> list = entry.getResponses().get(requestId); List<Result> list = entry.getResponses().get(requestId);
list.remove(0); list.remove(res);
if (list.isEmpty()) { if (list.isEmpty()) {
entry.getResponses().remove(requestId); entry.getResponses().remove(requestId);
} }

@ -33,18 +33,21 @@ public class ResponseEntry {
public static class Result { public static class Result {
private final CompletableFuture<? extends RRemoteServiceResponse> promise; private final CompletableFuture<? extends RRemoteServiceResponse> promise;
private final Timeout responseTimeoutFuture; private Timeout responseTimeoutFuture;
public Result(CompletableFuture<? extends RRemoteServiceResponse> promise, Timeout responseTimeoutFuture) { public Result(CompletableFuture<? extends RRemoteServiceResponse> promise) {
super(); super();
this.promise = promise; this.promise = promise;
this.responseTimeoutFuture = responseTimeoutFuture;
} }
public <T extends RRemoteServiceResponse> CompletableFuture<T> getPromise() { public <T extends RRemoteServiceResponse> CompletableFuture<T> getPromise() {
return (CompletableFuture<T>) promise; return (CompletableFuture<T>) promise;
} }
public void setResponseTimeoutFuture(Timeout responseTimeoutFuture) {
this.responseTimeoutFuture = responseTimeoutFuture;
}
public void cancelResponseTimeout() { public void cancelResponseTimeout() {
responseTimeoutFuture.cancel(); responseTimeoutFuture.cancel();
} }

Loading…
Cancel
Save