refactoring

pull/2452/merge
Nikita Koksharov 5 years ago
parent 2916912fe8
commit fbbf0a2ccc

@ -53,13 +53,13 @@ public abstract class BaseRemoteProxy {
final CommandAsyncExecutor commandExecutor;
private final String name;
final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
private final Map<String, ResponseEntry> responses;
final Codec codec;
final String executorId;
final BaseRemoteService remoteService;
BaseRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService) {
Map<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService) {
super();
this.commandExecutor = commandExecutor;
this.name = name;
@ -121,14 +121,7 @@ public abstract class BaseRemoteProxy {
ResponseEntry entry;
synchronized (responses) {
entry = responses.get(responseQueueName);
if (entry == null) {
entry = new ResponseEntry();
ResponseEntry oldEntry = responses.putIfAbsent(responseQueueName, entry);
if (oldEntry != null) {
entry = oldEntry;
}
}
entry = responses.computeIfAbsent(responseQueueName, k -> new ResponseEntry());
addCancelHandling(requestId, responseFuture);
@ -160,15 +153,17 @@ public abstract class BaseRemoteProxy {
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
if (!responseFuture.tryFailure(ex)) {
return;
}
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
}

Loading…
Cancel
Save