From 387fb465f00eaa36b8f7369bca304bdb9d28e530 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 19 Dec 2019 13:26:36 +0300 Subject: [PATCH] refactoring --- .../redisson/api/RemoteInvocationOptions.java | 2 +- .../org/redisson/remote/BaseRemoteProxy.java | 121 +++++++++--------- 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java b/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java index 0590dd2c9..ed3193db4 100644 --- a/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java +++ b/redisson/src/main/java/org/redisson/api/RemoteInvocationOptions.java @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit; * * @see RRemoteService#get(Class, RemoteInvocationOptions) */ -public class RemoteInvocationOptions implements Serializable { +public final class RemoteInvocationOptions implements Serializable { private static final long serialVersionUID = -7715968073286484802L; diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java index 6a6f1ebcb..c69cd96d8 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java @@ -115,7 +115,7 @@ public abstract class BaseRemoteProxy { return promise; } - protected RPromise pollResponse(long timeout, + protected final RPromise pollResponse(long timeout, RequestId requestId, boolean insertFirst) { RPromise responseFuture = new RedissonPromise(); @@ -129,65 +129,14 @@ public abstract class BaseRemoteProxy { entry = oldEntry; } } - - responseFuture.onComplete((res, ex) -> { - if (responseFuture.isCancelled()) { - synchronized (responses) { - ResponseEntry e = responses.get(responseQueueName); - List list = e.getResponses().get(requestId); - if (list == null) { - return; - } - - for (Iterator iterator = list.iterator(); iterator.hasNext();) { - Result result = iterator.next(); - if (result.getPromise() == responseFuture) { - result.getResponseTimeoutFuture().cancel(true); - iterator.remove(); - } - } - if (list.isEmpty()) { - e.getResponses().remove(requestId); - } - if (e.getResponses().isEmpty()) { - responses.remove(responseQueueName, e); - } - } - } - }); - - ScheduledFuture responseTimeoutFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { - @Override - public void run() { - synchronized (responses) { - ResponseEntry entry = responses.get(responseQueueName); - if (entry == null) { - return; - } - - RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); - if (responseFuture.tryFailure(ex)) { - List list = entry.getResponses().get(requestId); - list.remove(0); - if (list.isEmpty()) { - entry.getResponses().remove(requestId); - } - if (entry.getResponses().isEmpty()) { - responses.remove(responseQueueName, entry); - } - } - } - } - }, timeout, TimeUnit.MILLISECONDS); + addCancelHandling(requestId, responseFuture); + + ScheduledFuture responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture); Map> entryResponses = entry.getResponses(); - List list = entryResponses.get(requestId); - if (list == null) { - list = new ArrayList<>(3); - entryResponses.put(requestId, list); - } - + List list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3)); + Result res = new Result(responseFuture, responseTimeoutFuture); if (insertFirst) { list.add(0, res); @@ -196,11 +145,67 @@ public abstract class BaseRemoteProxy { } } - pollResponse(entry); return responseFuture; } + private ScheduledFuture createResponseTimeout(long timeout, RequestId requestId, RPromise responseFuture) { + return commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { + @Override + public void run() { + synchronized (responses) { + ResponseEntry entry = responses.get(responseQueueName); + if (entry == null) { + return; + } + + RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); + if (responseFuture.tryFailure(ex)) { + List list = entry.getResponses().get(requestId); + list.remove(0); + if (list.isEmpty()) { + entry.getResponses().remove(requestId); + } + if (entry.getResponses().isEmpty()) { + responses.remove(responseQueueName, entry); + } + } + } + } + }, timeout, TimeUnit.MILLISECONDS); + } + + private void addCancelHandling(RequestId requestId, RPromise responseFuture) { + responseFuture.onComplete((res, ex) -> { + if (!responseFuture.isCancelled()) { + return; + } + + synchronized (responses) { + ResponseEntry e = responses.get(responseQueueName); + List list = e.getResponses().get(requestId); + if (list == null) { + return; + } + + for (Iterator iterator = list.iterator(); iterator.hasNext();) { + Result result = iterator.next(); + if (result.getPromise() == responseFuture) { + result.getResponseTimeoutFuture().cancel(true); + iterator.remove(); + } + } + if (list.isEmpty()) { + e.getResponses().remove(requestId); + } + + if (e.getResponses().isEmpty()) { + responses.remove(responseQueueName, e); + } + } + }); + } + private RBlockingQueue getBlockingQueue(String name, Codec codec) { return new RedissonBlockingQueue(codec, commandExecutor, name, null); }