From ec153b891c9a4a72c9736742ab47444d8f500522 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Sat, 11 Jan 2020 08:52:21 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonRemoteService.java | 87 ++++++++++--------- .../java/org/redisson/api/RRemoteService.java | 33 ++++++- .../RedissonExecutorRemoteService.java | 7 +- 3 files changed, 83 insertions(+), 44 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 3f41df128..15a0795c3 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -167,6 +167,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS @Override public boolean tryExecute(Class remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException { + return tryExecute(remoteInterface, object, commandExecutor.getConnectionManager().getExecutor(), timeout, timeUnit); + } + + @Override + public boolean tryExecute(Class remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit) throws InterruptedException { String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE); @@ -184,13 +189,18 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS throw new IllegalStateException("Task can't be found for request: " + requestId); } - RFuture r = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object); + RFuture r = executeMethod(remoteInterface, requestQueue, executorService, request, object); commandExecutor.getInterrupted(r); return true; } @Override public RFuture tryExecuteAsync(Class remoteInterface, T object, long timeout, TimeUnit timeUnit) { + return tryExecuteAsync(remoteInterface, object, commandExecutor.getConnectionManager().getExecutor(), timeout, timeUnit); + } + + @Override + public RFuture tryExecuteAsync(Class remoteInterface, T object, ExecutorService executor, long timeout, TimeUnit timeUnit) { RPromise result = new RedissonPromise<>(); result.setUncancellable(); String requestQueueName = getRequestQueueName(remoteInterface); @@ -226,7 +236,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - RFuture future = executeMethod(remoteInterface, requestQueue, commandExecutor.getConnectionManager().getExecutor(), request, object); + RFuture future = executeMethod(remoteInterface, requestQueue, executor, request, object); future.onComplete((r, ex) -> { if (ex != null) { result.tryFailure(ex); @@ -394,11 +404,42 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RPromise cancelRequestFuture = new RedissonPromise<>(); scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture); + responsePromise.onComplete((result, e) -> { + if (request.getOptions().isResultExpected() + || result instanceof RemoteServiceCancelResponse) { + + long timeout = 60 * 1000; + if (request.getOptions().getExecutionTimeoutInMillis() != null) { + timeout = request.getOptions().getExecutionTimeoutInMillis(); + } + + RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); + try { + RFuture clientsFuture = queue.putAsync(result); + queue.expireAsync(timeout, TimeUnit.MILLISECONDS); + + clientsFuture.onComplete((res, exc) -> { + if (exc != null) { + if (exc instanceof RedissonShutdownException) { + return; + } + log.error("Can't send response: " + result + " for request: " + request, exc); + } + + resubscribe(remoteInterface, requestQueue, executor, method.getBean()); + }); + } catch (Exception ex) { + log.error("Can't send response: " + result + " for request: " + request, e); + } + } else { + resubscribe(remoteInterface, requestQueue, executor, method.getBean()); + } + }); + java.util.concurrent.Future submitFuture = executor.submit(() -> { - invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, - cancelRequestFuture, responsePromise); + invokeMethod(request, method, cancelRequestFuture, responsePromise); }); - + cancelRequestFuture.onComplete((r, e) -> { if (e != null) { return; @@ -423,10 +464,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return responsePromise; } - protected void invokeMethod(Class remoteInterface, - RBlockingQueue requestQueue, RemoteServiceRequest request, - RemoteServiceMethod method, String responseName, ExecutorService executor, - RFuture cancelRequestFuture, RPromise responsePromise) { + protected void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method, + RFuture cancelRequestFuture, RPromise responsePromise) { try { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); @@ -441,36 +480,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (cancelRequestFuture != null) { cancelRequestFuture.cancel(false); } - - // send the response only if expected or task was canceled - if (request.getOptions().isResultExpected() - || responsePromise.getNow() instanceof RemoteServiceCancelResponse) { - long timeout = 60 * 1000; - if (request.getOptions().getExecutionTimeoutInMillis() != null) { - timeout = request.getOptions().getExecutionTimeoutInMillis(); - } - - RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); - try { - RFuture clientsFuture = queue.putAsync(responsePromise.getNow()); - queue.expireAsync(timeout, TimeUnit.MILLISECONDS); - - clientsFuture.onComplete((res, e) -> { - if (e != null) { - if (e instanceof RedissonShutdownException) { - return; - } - log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e); - } - - resubscribe(remoteInterface, requestQueue, executor, method.getBean()); - }); - } catch (Exception e) { - log.error("Can't send response: " + responsePromise.getNow() + " for request: " + request, e); - } - } else { - resubscribe(remoteInterface, requestQueue, executor, method.getBean()); - } } private void resubscribe(Class remoteInterface, RBlockingQueue requestQueue, diff --git a/redisson/src/main/java/org/redisson/api/RRemoteService.java b/redisson/src/main/java/org/redisson/api/RRemoteService.java index d13588906..d05a82627 100644 --- a/redisson/src/main/java/org/redisson/api/RRemoteService.java +++ b/redisson/src/main/java/org/redisson/api/RRemoteService.java @@ -108,7 +108,7 @@ public interface RRemoteService { * @param remoteInterface - remote service interface * @param object - remote service object * @param workers - workers amount - * @param executor - executor service + * @param executor - executor service used to invoke methods */ void register(Class remoteInterface, T object, int workers, ExecutorService executor); @@ -135,6 +135,22 @@ public interface RRemoteService { */ boolean tryExecute(Class remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException; + /** + * Tries to execute one awaiting remote request. + * Waits up to timeout if necessary until remote request became available. + * + * @param remoteInterface - remote service interface + * @param object - remote service object + * @param timeout - maximum wait time until remote request became available + * @param timeUnit - time unit + * @param executorService - executor service used to invoke methods + * @param - type of remote service + * @return true if method was successfully executed and + * false if timeout reached before execution + * @throws InterruptedException - if the thread is interrupted + */ + boolean tryExecute(Class remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit) throws InterruptedException; + /** * Tries to execute one awaiting remote request. * @@ -160,6 +176,21 @@ public interface RRemoteService { */ RFuture tryExecuteAsync(Class remoteInterface, T object, long timeout, TimeUnit timeUnit); + /** + * Tries to execute one awaiting remote request. + * Waits up to timeout if necessary until remote request became available. + * + * @param remoteInterface - remote service interface + * @param object - remote service object + * @param timeout - maximum wait time until remote request became available + * @param timeUnit - time unit + * @param executorService - executor service used to invoke methods + * @param - type of remote service + * @return true if method was successfully executed and + * false if timeout reached before execution + */ + RFuture tryExecuteAsync(Class remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit); + /** * Get remote service object for remote invocations. *

diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index c138c43c9..bda5e0339 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -87,9 +87,8 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { } @Override - protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, - RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture cancelRequestFuture, - RPromise responsePromise) { + protected void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method, + RFuture cancelRequestFuture, RPromise responsePromise) { startedListeners.stream().forEach(l -> l.onStarted(request.getId())); if (taskTimeout > 0) { @@ -97,7 +96,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { ((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false)); }, taskTimeout, TimeUnit.MILLISECONDS); } - super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responsePromise); + super.invokeMethod(request, method, cancelRequestFuture, responsePromise); if (responsePromise.getNow() instanceof RemoteServiceResponse) { RemoteServiceResponse response = (RemoteServiceResponse) responsePromise.getNow();