refactoring

pull/2452/merge
Nikita Koksharov 5 years ago
parent c0f980bf57
commit 387fb465f0

@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit;
* *
* @see RRemoteService#get(Class, RemoteInvocationOptions) * @see RRemoteService#get(Class, RemoteInvocationOptions)
*/ */
public class RemoteInvocationOptions implements Serializable { public final class RemoteInvocationOptions implements Serializable {
private static final long serialVersionUID = -7715968073286484802L; private static final long serialVersionUID = -7715968073286484802L;

@ -115,7 +115,7 @@ public abstract class BaseRemoteProxy {
return promise; return promise;
} }
protected <T extends RRemoteServiceResponse> RPromise<T> pollResponse(long timeout, protected final <T extends RRemoteServiceResponse> RPromise<T> pollResponse(long timeout,
RequestId requestId, boolean insertFirst) { RequestId requestId, boolean insertFirst) {
RPromise<T> responseFuture = new RedissonPromise<T>(); RPromise<T> responseFuture = new RedissonPromise<T>();
@ -129,65 +129,14 @@ public abstract class BaseRemoteProxy {
entry = oldEntry; entry = oldEntry;
} }
} }
responseFuture.onComplete((res, ex) -> {
if (responseFuture.isCancelled()) {
synchronized (responses) {
ResponseEntry e = responses.get(responseQueueName);
List<Result> list = e.getResponses().get(requestId);
if (list == null) {
return;
}
for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next();
if (result.getPromise() == responseFuture) {
result.getResponseTimeoutFuture().cancel(true);
iterator.remove();
}
}
if (list.isEmpty()) {
e.getResponses().remove(requestId);
}
if (e.getResponses().isEmpty()) { addCancelHandling(requestId, responseFuture);
responses.remove(responseQueueName, e);
} ScheduledFuture<?> responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture);
}
}
});
ScheduledFuture<?> responseTimeoutFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
}
}
}, timeout, TimeUnit.MILLISECONDS);
Map<RequestId, List<Result>> entryResponses = entry.getResponses(); Map<RequestId, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.get(requestId); List<Result> list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3));
if (list == null) {
list = new ArrayList<>(3);
entryResponses.put(requestId, list);
}
Result res = new Result(responseFuture, responseTimeoutFuture); Result res = new Result(responseFuture, responseTimeoutFuture);
if (insertFirst) { if (insertFirst) {
list.add(0, res); list.add(0, res);
@ -196,11 +145,67 @@ public abstract class BaseRemoteProxy {
} }
} }
pollResponse(entry); pollResponse(entry);
return responseFuture; return responseFuture;
} }
private <T extends RRemoteServiceResponse> ScheduledFuture<?> createResponseTimeout(long timeout, RequestId requestId, RPromise<T> responseFuture) {
return commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) {
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
private <T extends RRemoteServiceResponse> void addCancelHandling(RequestId requestId, RPromise<T> responseFuture) {
responseFuture.onComplete((res, ex) -> {
if (!responseFuture.isCancelled()) {
return;
}
synchronized (responses) {
ResponseEntry e = responses.get(responseQueueName);
List<Result> list = e.getResponses().get(requestId);
if (list == null) {
return;
}
for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next();
if (result.getPromise() == responseFuture) {
result.getResponseTimeoutFuture().cancel(true);
iterator.remove();
}
}
if (list.isEmpty()) {
e.getResponses().remove(requestId);
}
if (e.getResponses().isEmpty()) {
responses.remove(responseQueueName, e);
}
}
});
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) { private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name, null); return new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
} }

Loading…
Cancel
Save